Documentation
¶
Overview ¶
Example (ConsumingEventsUsingConsumerClient) ¶
Shows how to start consuming events in partitions in an Event Hub using the ConsumerClient.
If you have an Azure Storage account you can use the Processor type instead, which will handle distributing partitions between multiple consumers and storing progress using checkpoints. See example_consuming_with_checkpoints_test.go for an example.
package main import ( "context" "errors" "fmt" "os" "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" ) func main() { eventHubNamespace := os.Getenv("EVENTHUB_NAMESPACE") // <ex: myeventhubnamespace.servicebus.windows.net> eventHubName := os.Getenv("EVENTHUB_NAME") partitionID := os.Getenv("EVENTHUB_PARTITION_ID") fmt.Printf("Event Hub Namespace: %s, hubname: %s\n", eventHubNamespace, eventHubName) defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil) if err != nil { panic(err) } // Can also use a connection string: // // consumerClient, err = azeventhubs.NewConsumerClientFromConnectionString(connectionString, eventHubName, azeventhubs.DefaultConsumerGroup, nil) // consumerClient, err := azeventhubs.NewConsumerClient(eventHubNamespace, eventHubName, azeventhubs.DefaultConsumerGroup, defaultAzureCred, nil) if err != nil { panic(err) } defer consumerClient.Close(context.TODO()) partitionClient, err := consumerClient.NewPartitionClient(partitionID, &azeventhubs.PartitionClientOptions{ StartPosition: azeventhubs.StartPosition{ Earliest: to.Ptr(true), }, }) if err != nil { panic(err) } defer partitionClient.Close(context.TODO()) // Will wait up to 1 minute for 100 events. If the context is cancelled (or expires) // you'll get any events that have been collected up to that point. receiveCtx, cancel := context.WithTimeout(context.TODO(), time.Minute) events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil) cancel() if err != nil && !errors.Is(err, context.DeadlineExceeded) { panic(err) } for _, event := range events { // We're assuming the Body is a byte-encoded string. EventData.Body supports any payload // that can be encoded to []byte. fmt.Printf("Event received with body '%s'\n", string(event.Body)) } fmt.Printf("Done receiving events\n") }
Output:
Example (ConsumingEventsWithCheckpoints) ¶
Shows how to use the Processor type, using a ConsumerClient and CheckpointStore.
The Processor type acts as a load balancer, ensuring that partitions are divided up evenly amongst active Processor instances. It also allows storing (and restoring) checkpoints of progress.
NOTE: If you want to manually allocate partitions or to have more control over the process you can use the ConsumerClient. See example_consuming_events_test.go for an example.
package main import ( "context" "errors" "fmt" "log" "os" "time" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" ) // Shows how to use the [Processor] type, using a [ConsumerClient] and [CheckpointStore]. // // The Processor type acts as a load balancer, ensuring that partitions are divided up evenly // amongst active Processor instances. It also allows storing (and restoring) checkpoints of progress. // // NOTE: If you want to manually allocate partitions or to have more control over the process you can use // the [ConsumerClient]. See [example_consuming_events_test.go] for an example. // // [example_consuming_events_test.go]: https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/example_consuming_events_test.go func main() { // The Processor makes it simpler to do distributed consumption of an Event Hub. // It automatically coordinates with other Processor instances to ensure balanced // allocation of partitions and tracks status, durably, in a CheckpointStore. // // The built-in checkpoint store (available in the `azeventhubs/checkpoints` package) uses // Azure Blob storage. eventHubNamespace := os.Getenv("EVENTHUB_NAMESPACE") eventHubName := os.Getenv("EVENTHUB_NAME") storageEndpoint := os.Getenv("CHECKPOINTSTORE_STORAGE_ENDPOINT") storageContainerName := os.Getenv("CHECKPOINTSTORE_STORAGE_CONTAINER_NAME") if eventHubName == "" || eventHubNamespace == "" || storageEndpoint == "" || storageContainerName == "" { fmt.Fprintf(os.Stderr, "Skipping example, environment variables missing\n") return } consumerClient, checkpointStore, err := createClientsForExample(eventHubNamespace, eventHubName, storageEndpoint, storageContainerName) if err != nil { // TODO: Update the following line with your application specific error handling logic log.Printf("ERROR: %s", err) return } defer consumerClient.Close(context.TODO()) // Create the Processor // // The Processor handles load balancing with other Processor instances, running in separate // processes or even on separate machines. Each one will use the checkpointStore to coordinate // state and ownership, dynamically. processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil) if err != nil { // TODO: Update the following line with your application specific error handling logic log.Printf("ERROR: %s", err) return } // Run in the background, launching goroutines to process each partition go dispatchPartitionClients(processor) // Run the load balancer. The dispatchPartitionClients goroutine (launched above) // will receive and dispatch ProcessorPartitionClients as partitions are claimed. // // Stopping the processor is as simple as canceling the context that you passed // in to Run. processorCtx, processorCancel := context.WithCancel(context.TODO()) defer processorCancel() if err := processor.Run(processorCtx); err != nil { // TODO: Update the following line with your application specific error handling logic log.Printf("ERROR: %s", err) return } } func dispatchPartitionClients(processor *azeventhubs.Processor) { for { processorPartitionClient := processor.NextPartitionClient(context.TODO()) if processorPartitionClient == nil { // Processor has stopped break } go func() { if err := processEventsForPartition(processorPartitionClient); err != nil { // TODO: Update the following line with your application specific error handling logic log.Fatalf("ERROR: %s", err) } }() } } // processEventsForPartition shows the typical pattern for processing a partition. func processEventsForPartition(partitionClient *azeventhubs.ProcessorPartitionClient) error { // 1. [BEGIN] Initialize any partition specific resources for your application. // 2. [CONTINUOUS] Loop, calling ReceiveEvents() and UpdateCheckpoint(). // 3. [END] Cleanup any resources. defer func() { // 3/3 [END] Do cleanup here, like shutting down database clients // or other resources used for processing this partition. shutdownPartitionResources(partitionClient) }() // 1/3 [BEGIN] Initialize any partition specific resources for your application. if err := initializePartitionResources(partitionClient.PartitionID()); err != nil { return err } // 2/3 [CONTINUOUS] Receive events, checkpointing as needed using UpdateCheckpoint. log.Printf("Starting to receive for partition %s", partitionClient.PartitionID()) for { // Wait up to a minute for 100 events, otherwise returns whatever we collected during that time. receiveCtx, cancelReceive := context.WithTimeout(context.TODO(), time.Minute) events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil) cancelReceive() if err != nil && !errors.Is(err, context.DeadlineExceeded) { var eventHubError *azeventhubs.Error if errors.As(err, &eventHubError) && eventHubError.Code == azeventhubs.ErrorCodeOwnershipLost { return nil } return err } if len(events) == 0 { continue } log.Printf("Received %d event(s)", len(events)) for _, event := range events { log.Printf("Event received with body %v", event.Body) } // Updates the checkpoint with the latest event received. If processing needs to restart // it will restart from this point, automatically. if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil { return err } } } func initializePartitionResources(partitionID string) error { // initialize things that might be partition specific, like a // database connection. log.Printf("Initializing partition related resources for partition %s", partitionID) return nil } func shutdownPartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) { // Each PartitionClient holds onto an external resource and should be closed if you're // not processing them anymore. defer partitionClient.Close(context.TODO()) log.Printf("Shutting down partition related resources for partition %s", partitionClient.PartitionID()) } func createClientsForExample(eventHubNamespace, eventHubName, storageServiceURL, storageContainerName string) (*azeventhubs.ConsumerClient, azeventhubs.CheckpointStore, error) { defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil) if err != nil { return nil, nil, err } // NOTE: the storageContainerName must exist before the checkpoint store can be used. blobClient, err := azblob.NewClient(storageServiceURL, defaultAzureCred, nil) if err != nil { return nil, nil, err } azBlobContainerClient := blobClient.ServiceClient().NewContainerClient(storageContainerName) checkpointStore, err := checkpoints.NewBlobStore(azBlobContainerClient, nil) if err != nil { return nil, nil, err } consumerClient, err := azeventhubs.NewConsumerClient(eventHubNamespace, eventHubName, azeventhubs.DefaultConsumerGroup, defaultAzureCred, nil) if err != nil { return nil, nil, err } return consumerClient, checkpointStore, nil }
Output:
Example (EnableLogging) ¶
package main import ( "fmt" azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" ) func main() { // print log output to stdout azlog.SetListener(printLoggedEvent) // pick the set of events to log azlog.SetEvents( azeventhubs.EventConn, azeventhubs.EventAuth, azeventhubs.EventProducer, azeventhubs.EventConsumer, ) fmt.Printf("Logging enabled\n") } func printLoggedEvent(event azlog.Event, s string) { fmt.Printf("[%s] %s\n", event, s) }
Output:
Example (MigrateCheckpoints) ¶
Shows how to migrate from the older `github.com/Azure/azure-event-hubs-go` checkpointer to to the format used by this package, `github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints/BlobStore`
NOTE: This example is not safe to run while either the old or new checkpoint store is in-use as it doesn't respect locking or ownership.
package main import ( "context" "encoding/json" "fmt" "os" "strconv" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" ) type LegacyCheckpoint struct { PartitionID string `json:"partitionID"` Epoch int `json:"epoch"` Owner string `json:"owner"` Checkpoint struct { Offset string `json:"offset"` SequenceNumber int64 `json:"sequenceNumber"` EnqueueTime string `json:"enqueueTime"` // ": "0001-01-01T00:00:00Z" } `json:"checkpoint"` } // Shows how to migrate from the older `github.com/Azure/azure-event-hubs-go` checkpointer to to // the format used by this package, `github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints/BlobStore` // // NOTE: This example is not safe to run while either the old or new checkpoint store is in-use as it doesn't // respect locking or ownership. func main() { // Azure Event Hubs connection string. You can get this from the Azure Portal. // For example: youreventhub.servicebus.windows.net var EventHubNamespace = os.Getenv("EVENTHUB_NAMESPACE") // Name of your Event Hub that these checkpoints reference. var EventHubName = os.Getenv("EVENTHUB_NAME") // Name of your Event Hub consumer group // Example: $Default var EventHubConsumerGroup = os.Getenv("EVENTHUB_CONSUMER_GROUP") // Azure Storage account connection string. You can get this from the Azure Portal. // For example: DefaultEndpointsProtocol=https;AccountName=accountname;AccountKey=account-key;EndpointSuffix=core.windows.net var StorageConnectionString = os.Getenv("STORAGE_CONNECTION_STRING") // Optional: If you used `eventhub.WithPrefixInBlobPath()` configuration option for your Event Processor Host // then you'll need to set this value. // // NOTE: This is no longer needed with the new checkpoint store as it automatically makes the path unique // for each combination of eventhub + hubname + consumergroup + partition. var BlobPrefix = os.Getenv("OLD_STORAGE_BLOB_PREFIX") // Name of the checkpoint store's Azure Storage container. var OldStorageContainerName = os.Getenv("OLD_STORAGE_CONTAINER_NAME") // Name of the Azure Storage container to place new checkpoints in. var NewStorageContainerName = os.Getenv("NEW_STORAGE_CONTAINER_NAME") if EventHubNamespace == "" || EventHubName == "" || EventHubConsumerGroup == "" || StorageConnectionString == "" || OldStorageContainerName == "" || NewStorageContainerName == "" { fmt.Printf("Skipping migration, missing parameters\n") return } blobClient, err := azblob.NewClientFromConnectionString(StorageConnectionString, nil) if err != nil { panic(err) } oldCheckpoints, err := loadOldCheckpoints(blobClient, OldStorageContainerName, BlobPrefix) if err != nil { panic(err) } newCheckpointStore, err := checkpoints.NewBlobStore(blobClient.ServiceClient().NewContainerClient(NewStorageContainerName), nil) if err != nil { panic(err) } for _, oldCheckpoint := range oldCheckpoints { newCheckpoint := azeventhubs.Checkpoint{ ConsumerGroup: EventHubConsumerGroup, EventHubName: EventHubName, FullyQualifiedNamespace: EventHubNamespace, PartitionID: oldCheckpoint.PartitionID, } offset, err := strconv.ParseInt(oldCheckpoint.Checkpoint.Offset, 10, 64) if err != nil { panic(err) } newCheckpoint.Offset = &offset newCheckpoint.SequenceNumber = &oldCheckpoint.Checkpoint.SequenceNumber if err := newCheckpointStore.SetCheckpoint(context.Background(), newCheckpoint, nil); err != nil { panic(err) } } } func loadOldCheckpoints(blobClient *azblob.Client, containerName string, customBlobPrefix string) ([]*LegacyCheckpoint, error) { blobPrefix := &customBlobPrefix if customBlobPrefix == "" { blobPrefix = nil } pager := blobClient.NewListBlobsFlatPager(containerName, &container.ListBlobsFlatOptions{ Prefix: blobPrefix, }) var checkpoints []*LegacyCheckpoint for pager.More() { page, err := pager.NextPage(context.Background()) if err != nil { return nil, err } for _, item := range page.Segment.BlobItems { buff := [4000]byte{} len, err := blobClient.DownloadBuffer(context.Background(), containerName, *item.Name, buff[:], nil) if err != nil { return nil, err } var legacyCheckpoint *LegacyCheckpoint if err := json.Unmarshal(buff[0:len], &legacyCheckpoint); err != nil { return nil, err } checkpoints = append(checkpoints, legacyCheckpoint) } } return checkpoints, nil }
Output:
Example (ProducingEventsUsingProducerClient) ¶
Shows how to send events to an Event Hub partition using the ProducerClient and EventDataBatch.
package main import ( "context" "errors" "os" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" ) // Shows how to send events to an Event Hub partition using the [ProducerClient] // and [EventDataBatch]. func main() { eventHubNamespace := os.Getenv("EVENTHUB_NAMESPACE") // <ex: myeventhubnamespace.servicebus.windows.net> eventHubName := os.Getenv("EVENTHUB_NAME") defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil) if err != nil { panic(err) } // Can also use a connection string: // // producerClient, err := azeventhubs.NewProducerClientFromConnectionString(connectionString, eventHubName, nil) // producerClient, err := azeventhubs.NewProducerClient(eventHubNamespace, eventHubName, defaultAzureCred, nil) if err != nil { panic(err) } defer producerClient.Close(context.TODO()) events := createEventsForSample() newBatchOptions := &azeventhubs.EventDataBatchOptions{ // The options allow you to control the size of the batch, as well as the partition it will get sent to. // PartitionID can be used to target a specific partition ID. // specific partition ID. // // PartitionID: partitionID, // PartitionKey can be used to ensure that messages that have the same key // will go to the same partition without requiring your application to specify // that partition ID. // // PartitionKey: partitionKey, // // Or, if you leave both PartitionID and PartitionKey nil, the service will choose a partition. } // Creates an EventDataBatch, which you can use to pack multiple events together, allowing for efficient transfer. batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions) if err != nil { panic(err) } for i := 0; i < len(events); i++ { err = batch.AddEventData(events[i], nil) if errors.Is(err, azeventhubs.ErrEventDataTooLarge) { if batch.NumEvents() == 0 { // This one event is too large for this batch, even on its own. No matter what we do it // will not be sendable at its current size. panic(err) } // This batch is full - we can send it and create a new one and continue // packaging and sending events. if err := producerClient.SendEventDataBatch(context.TODO(), batch, nil); err != nil { panic(err) } // create the next batch we'll use for events, ensuring that we use the same options // each time so all the messages go the same target. tmpBatch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions) if err != nil { panic(err) } batch = tmpBatch // rewind so we can retry adding this event to a batch i-- } else if err != nil { panic(err) } } // if we have any events in the last batch, send it if batch.NumEvents() > 0 { if err := producerClient.SendEventDataBatch(context.TODO(), batch, nil); err != nil { panic(err) } } } func createEventsForSample() []*azeventhubs.EventData { return []*azeventhubs.EventData{ { Body: []byte("hello"), }, { Body: []byte("world"), }, } }
Output:
Index ¶
- Constants
- Variables
- type AMQPAnnotatedMessage
- type AMQPAnnotatedMessageBody
- type AMQPAnnotatedMessageHeader
- type AMQPAnnotatedMessageProperties
- type AddEventDataOptions
- type Checkpoint
- type CheckpointStore
- type ClaimOwnershipOptions
- type ConnectionStringProperties
- type ConsumerClient
- func (cc *ConsumerClient) Close(ctx context.Context) error
- func (cc *ConsumerClient) GetEventHubProperties(ctx context.Context, options *GetEventHubPropertiesOptions) (EventHubProperties, error)
- func (cc *ConsumerClient) GetPartitionProperties(ctx context.Context, partitionID string, ...) (PartitionProperties, error)
- func (cc *ConsumerClient) InstanceID() string
- func (cc *ConsumerClient) NewPartitionClient(partitionID string, options *PartitionClientOptions) (*PartitionClient, error)
- type ConsumerClientOptions
- type Error
- type ErrorCode
- type EventData
- type EventDataBatch
- type EventDataBatchOptions
- type EventHubProperties
- type GetEventHubPropertiesOptions
- type GetPartitionPropertiesOptions
- type ListCheckpointsOptions
- type ListOwnershipOptions
- type Ownership
- type PartitionClient
- type PartitionClientOptions
- type PartitionProperties
- type Processor
- type ProcessorOptions
- type ProcessorPartitionClient
- func (c *ProcessorPartitionClient) Close(ctx context.Context) error
- func (p *ProcessorPartitionClient) PartitionID() string
- func (c *ProcessorPartitionClient) ReceiveEvents(ctx context.Context, count int, options *ReceiveEventsOptions) ([]*ReceivedEventData, error)
- func (p *ProcessorPartitionClient) UpdateCheckpoint(ctx context.Context, latestEvent *ReceivedEventData, ...) error
- type ProcessorStrategy
- type ProducerClient
- func (pc *ProducerClient) Close(ctx context.Context) error
- func (pc *ProducerClient) GetEventHubProperties(ctx context.Context, options *GetEventHubPropertiesOptions) (EventHubProperties, error)
- func (pc *ProducerClient) GetPartitionProperties(ctx context.Context, partitionID string, ...) (PartitionProperties, error)
- func (pc *ProducerClient) NewEventDataBatch(ctx context.Context, options *EventDataBatchOptions) (*EventDataBatch, error)
- func (pc *ProducerClient) SendEventDataBatch(ctx context.Context, batch *EventDataBatch, options *SendEventDataBatchOptions) error
- type ProducerClientOptions
- type ReceiveEventsOptions
- type ReceivedEventData
- type RetryOptions
- type SendEventDataBatchOptions
- type SetCheckpointOptions
- type StartPosition
- type StartPositions
- type UpdateCheckpointOptions
- type WebSocketConnParams
Examples ¶
- Package (ConsumingEventsUsingConsumerClient)
- Package (ConsumingEventsWithCheckpoints)
- Package (EnableLogging)
- Package (MigrateCheckpoints)
- Package (ProducingEventsUsingProducerClient)
- ConsumerClient.GetEventHubProperties
- ConsumerClient.GetPartitionProperties
- ConsumerClient.NewPartitionClient (ConfiguringPrefetch)
- ConsumerClient.NewPartitionClient (ReceiveEvents)
- EventDataBatch.AddEventData
- EventDataBatch.AddEventData (RawAMQPMessages)
- NewConsumerClient
- NewConsumerClient (UsingCustomEndpoint)
- NewConsumerClientFromConnectionString
- NewProducerClient
- NewProducerClient (UsingCustomEndpoint)
- NewProducerClientFromConnectionString
- ProducerClient.GetEventHubProperties
- ProducerClient.GetPartitionProperties
- ProducerClient.SendEventDataBatch
Constants ¶
const ( // EventConn is used whenever we create a connection or any links (ie: producers, consumers). EventConn log.Event = exported.EventConn // EventAuth is used when we're doing authentication/claims negotiation. EventAuth log.Event = exported.EventAuth // EventProducer represents operations that happen on Producers. EventProducer log.Event = exported.EventProducer // EventConsumer represents operations that happen on Consumers. EventConsumer log.Event = exported.EventConsumer )
const DefaultConsumerGroup = "$Default"
DefaultConsumerGroup is the name of the default consumer group in the Event Hubs service.
Variables ¶
var ErrEventDataTooLarge = errors.New("the EventData could not be added because it is too large for the batch")
ErrEventDataTooLarge is returned when a message cannot fit into a batch when using the azeventhubs.EventDataBatch.AddEventData function.
Functions ¶
This section is empty.
Types ¶
type AMQPAnnotatedMessage ¶ added in v0.2.0
type AMQPAnnotatedMessage struct { // ApplicationProperties corresponds to the "application-properties" section of an AMQP message. // // The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage. ApplicationProperties map[string]any // Body represents the body of an AMQP message. Body AMQPAnnotatedMessageBody // DeliveryAnnotations corresponds to the "delivery-annotations" section in an AMQP message. // // The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage. DeliveryAnnotations map[any]any // DeliveryTag corresponds to the delivery-tag property of the TRANSFER frame // for this message. DeliveryTag []byte // // The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage. Footer map[any]any // Header is the transport headers for this AMQP message. Header *AMQPAnnotatedMessageHeader // MessageAnnotations corresponds to the message-annotations section of an AMQP message. // // The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage. MessageAnnotations map[any]any // Properties corresponds to the properties section of an AMQP message. Properties *AMQPAnnotatedMessageProperties }
AMQPAnnotatedMessage represents the AMQP message, as received from Event Hubs. For details about these properties, refer to the AMQP specification:
https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
Some fields in this struct are typed 'any', which means they will accept AMQP primitives, or in some cases slices and maps.
AMQP simple types include: - int (any size), uint (any size) - float (any size) - string - bool - time.Time
type AMQPAnnotatedMessageBody ¶ added in v0.2.0
type AMQPAnnotatedMessageBody struct { // Data is encoded/decoded as multiple data sections in the body. Data [][]byte // Sequence is encoded/decoded as one or more amqp-sequence sections in the body. // // The values of the slices are are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage. Sequence [][]any // Value is encoded/decoded as the amqp-value section in the body. // // The type of Value can be any of the AMQP simple types, as listed in the comment for AMQPAnnotatedMessage, // as well as slices or maps of AMQP simple types. Value any }
AMQPAnnotatedMessageBody represents the body of an AMQP message. Only one of these fields can be used a a time. They are mutually exclusive.
type AMQPAnnotatedMessageHeader ¶ added in v0.2.0
type AMQPAnnotatedMessageHeader struct { // DeliveryCount is the number of unsuccessful previous attempts to deliver this message. // It corresponds to the 'delivery-count' property. DeliveryCount uint32 // Durable corresponds to the 'durable' property. Durable bool // FirstAcquirer corresponds to the 'first-acquirer' property. FirstAcquirer bool // Priority corresponds to the 'priority' property. Priority uint8 // TTL corresponds to the 'ttl' property. TTL time.Duration }
AMQPAnnotatedMessageHeader carries standard delivery details about the transfer of a message. See https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-header for more details.
type AMQPAnnotatedMessageProperties ¶ added in v0.2.0
type AMQPAnnotatedMessageProperties struct { // AbsoluteExpiryTime corresponds to the 'absolute-expiry-time' property. AbsoluteExpiryTime *time.Time // ContentEncoding corresponds to the 'content-encoding' property. ContentEncoding *string // ContentType corresponds to the 'content-type' property ContentType *string // CorrelationID corresponds to the 'correlation-id' property. // The type of CorrelationID can be a uint64, UUID, []byte, or a string CorrelationID any // CreationTime corresponds to the 'creation-time' property. CreationTime *time.Time // GroupID corresponds to the 'group-id' property. GroupID *string // GroupSequence corresponds to the 'group-sequence' property. GroupSequence *uint32 // MessageID corresponds to the 'message-id' property. // The type of MessageID can be a uint64, UUID, []byte, or string MessageID any // ReplyTo corresponds to the 'reply-to' property. ReplyTo *string // ReplyToGroupID corresponds to the 'reply-to-group-id' property. ReplyToGroupID *string // Subject corresponds to the 'subject' property. Subject *string // To corresponds to the 'to' property. To *string // UserID corresponds to the 'user-id' property. UserID []byte }
AMQPAnnotatedMessageProperties represents the properties of an AMQP message. See here for more details: http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties
type AddEventDataOptions ¶
type AddEventDataOptions struct { }
AddEventDataOptions contains optional parameters for the AddEventData function.
type Checkpoint ¶ added in v0.1.1
type Checkpoint struct { ConsumerGroup string EventHubName string FullyQualifiedNamespace string PartitionID string Offset *int64 // the last succesfully processed Offset. SequenceNumber *int64 // the last succesfully processed SequenceNumber. }
Checkpoint tracks the last succesfully processed event in a partition.
type CheckpointStore ¶ added in v0.1.1
type CheckpointStore interface { // ClaimOwnership attempts to claim ownership of the partitions in partitionOwnership and returns // the actual partitions that were claimed. ClaimOwnership(ctx context.Context, partitionOwnership []Ownership, options *ClaimOwnershipOptions) ([]Ownership, error) // ListCheckpoints lists all the available checkpoints. ListCheckpoints(ctx context.Context, fullyQualifiedNamespace string, eventHubName string, consumerGroup string, options *ListCheckpointsOptions) ([]Checkpoint, error) // ListOwnership lists all ownerships. ListOwnership(ctx context.Context, fullyQualifiedNamespace string, eventHubName string, consumerGroup string, options *ListOwnershipOptions) ([]Ownership, error) // SetCheckpoint updates a specific checkpoint with a sequence and offset. SetCheckpoint(ctx context.Context, checkpoint Checkpoint, options *SetCheckpointOptions) error }
CheckpointStore is used by multiple consumers to coordinate progress and ownership for partitions.
type ClaimOwnershipOptions ¶ added in v0.1.1
type ClaimOwnershipOptions struct { }
ClaimOwnershipOptions contains optional parameters for the ClaimOwnership function
type ConnectionStringProperties ¶ added in v0.5.0
type ConnectionStringProperties = exported.ConnectionStringProperties
ConnectionStringProperties are the properties of a connection string as returned by ParseConnectionString.
func ParseConnectionString ¶ added in v0.5.0
func ParseConnectionString(connStr string) (ConnectionStringProperties, error)
ParseConnectionString takes a connection string from the Azure portal and returns the parsed representation.
There are two supported formats:
- Connection strings generated from the portal (or elsewhere) that contain an embedded key and keyname.
- A connection string with an embedded SharedAccessSignature: Endpoint=sb://<sb>.servicebus.windows.net;SharedAccessSignature=SharedAccessSignature sr=<sb>.servicebus.windows.net&sig=<base64-sig>&se=<expiry>&skn=<keyname>"
type ConsumerClient ¶
type ConsumerClient struct {
// contains filtered or unexported fields
}
ConsumerClient can create PartitionClient instances, which can read events from a partition.
func NewConsumerClient ¶
func NewConsumerClient(fullyQualifiedNamespace string, eventHub string, consumerGroup string, credential azcore.TokenCredential, options *ConsumerClientOptions) (*ConsumerClient, error)
NewConsumerClient creates a ConsumerClient which uses an azcore.TokenCredential for authentication. You MUST call ConsumerClient.Close on this client to avoid leaking resources.
The fullyQualifiedNamespace is the Event Hubs namespace name (ex: myeventhub.servicebus.windows.net) The credential is one of the credentials in the azidentity package.
Example ¶
package main import ( "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" ) var consumerClient *azeventhubs.ConsumerClient func main() { // `DefaultAzureCredential` tries several common credential types. For more credential types // see this link: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity#readme-credential-types. defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil) if err != nil { panic(err) } consumerClient, err = azeventhubs.NewConsumerClient("<ex: myeventhubnamespace.servicebus.windows.net>", "eventhub-name", azeventhubs.DefaultConsumerGroup, defaultAzureCred, nil) if err != nil { panic(err) } }
Output:
Example (UsingCustomEndpoint) ¶
package main import ( "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" ) var consumerClient *azeventhubs.ConsumerClient func main() { // `DefaultAzureCredential` tries several common credential types. For more credential types // see this link: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity#readme-credential-types. defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil) if err != nil { panic(err) } consumerClient, err = azeventhubs.NewConsumerClient("<ex: myeventhubnamespace.servicebus.windows.net>", "eventhub-name", azeventhubs.DefaultConsumerGroup, defaultAzureCred, &azeventhubs.ConsumerClientOptions{ // A custom endpoint can be used when you need to connect to a TCP proxy. CustomEndpoint: "<address/hostname of TCP proxy>", }) if err != nil { panic(err) } }
Output:
func NewConsumerClientFromConnectionString ¶
func NewConsumerClientFromConnectionString(connectionString string, eventHub string, consumerGroup string, options *ConsumerClientOptions) (*ConsumerClient, error)
NewConsumerClientFromConnectionString creates a ConsumerClient from a connection string. You MUST call ConsumerClient.Close on this client to avoid leaking resources.
connectionString can be one of two formats - with or without an EntityPath key.
When the connection string does not have an entity path, as shown below, the eventHub parameter cannot be empty and should contain the name of your event hub.
Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>
When the connection string DOES have an entity path, as shown below, the eventHub parameter must be empty.
Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<entity path>;
Example ¶
package main import ( "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" ) var consumerClient *azeventhubs.ConsumerClient var err error func main() { // if the connection string contains an EntityPath // connectionString := "Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<entity path>" consumerClient, err = azeventhubs.NewConsumerClientFromConnectionString(connectionString, "", azeventhubs.DefaultConsumerGroup, nil) // or // if the connection string does not contain an EntityPath connectionString = "Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>" consumerClient, err = azeventhubs.NewConsumerClientFromConnectionString(connectionString, "eventhub-name", azeventhubs.DefaultConsumerGroup, nil) if err != nil { panic(err) } }
Output:
func (*ConsumerClient) Close ¶
func (cc *ConsumerClient) Close(ctx context.Context) error
Close releases resources for this client.
func (*ConsumerClient) GetEventHubProperties ¶
func (cc *ConsumerClient) GetEventHubProperties(ctx context.Context, options *GetEventHubPropertiesOptions) (EventHubProperties, error)
GetEventHubProperties gets event hub properties, like the available partition IDs and when the Event Hub was created.
Example ¶
package main import ( "context" "fmt" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" ) var consumerClient *azeventhubs.ConsumerClient func main() { eventHubProps, err := consumerClient.GetEventHubProperties(context.TODO(), nil) if err != nil { panic(err) } for _, partitionID := range eventHubProps.PartitionIDs { fmt.Printf("Partition ID: %s\n", partitionID) } }
Output:
func (*ConsumerClient) GetPartitionProperties ¶
func (cc *ConsumerClient) GetPartitionProperties(ctx context.Context, partitionID string, options *GetPartitionPropertiesOptions) (PartitionProperties, error)
GetPartitionProperties gets properties for a specific partition. This includes data like the last enqueued sequence number, the first sequence number and when an event was last enqueued to the partition.
Example ¶
package main import ( "context" "fmt" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" ) var consumerClient *azeventhubs.ConsumerClient func main() { partitionProps, err := consumerClient.GetPartitionProperties(context.TODO(), "partition-id", nil) if err != nil { panic(err) } fmt.Printf("First sequence number for partition ID %s: %d\n", partitionProps.PartitionID, partitionProps.BeginningSequenceNumber) fmt.Printf("Last sequence number for partition ID %s: %d\n", partitionProps.PartitionID, partitionProps.LastEnqueuedSequenceNumber) }
Output:
func (*ConsumerClient) InstanceID ¶ added in v0.6.0
func (cc *ConsumerClient) InstanceID() string
InstanceID is the identifier for this ConsumerClient.
func (*ConsumerClient) NewPartitionClient ¶ added in v0.1.1
func (cc *ConsumerClient) NewPartitionClient(partitionID string, options *PartitionClientOptions) (*PartitionClient, error)
NewPartitionClient creates a client that can receive events from a partition. By default it starts at the latest point in the partition. This can be changed using the options parameter. You MUST call azeventhubs.PartitionClient.Close on the returned client to avoid leaking resources.
Example (ConfiguringPrefetch) ¶
package main import ( "context" "fmt" "time" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" ) var consumerClient *azeventhubs.ConsumerClient func main() { const partitionID = "0" // Prefetching configures the Event Hubs client to continually cache events, up to the configured size // in PartitionClientOptions.Prefetch. PartitionClient.ReceiveEvents will read from the cache first, // which can improve throughput in situations where you might normally be forced to request and wait // for more events. // By default, prefetch is enabled. partitionClient, err := consumerClient.NewPartitionClient(partitionID, nil) if err != nil { panic(err) } defer partitionClient.Close(context.TODO()) // You can configure the prefetch buffer size as well. The default is 300. partitionClientWithCustomPrefetch, err := consumerClient.NewPartitionClient(partitionID, &azeventhubs.PartitionClientOptions{ Prefetch: 301, }) if err != nil { panic(err) } defer partitionClientWithCustomPrefetch.Close(context.TODO()) // And prefetch can be disabled if you prefer to manually control the flow of events. Excess // events (that arrive after your ReceiveEvents() call has completed) will still be // buffered internally, but they will not be automatically replenished. partitionClientWithPrefetchDisabled, err := consumerClient.NewPartitionClient(partitionID, &azeventhubs.PartitionClientOptions{ Prefetch: -1, }) if err != nil { panic(err) } defer partitionClientWithPrefetchDisabled.Close(context.TODO()) // Using a context with a timeout will allow ReceiveEvents() to return with events it // collected in a minute, or earlier if it actually gets all 100 events we requested. receiveCtx, cancel := context.WithTimeout(context.TODO(), time.Minute) defer cancel() events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil) if err != nil { panic(err) } for _, evt := range events { fmt.Printf("Body: %s\n", string(evt.Body)) } }
Output:
Example (ReceiveEvents) ¶
package main import ( "context" "fmt" "time" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" ) var consumerClient *azeventhubs.ConsumerClient func main() { const partitionID = "0" partitionClient, err := consumerClient.NewPartitionClient(partitionID, nil) if err != nil { panic(err) } defer partitionClient.Close(context.TODO()) // Using a context with a timeout will allow ReceiveEvents() to return with events it // collected in a minute, or earlier if it actually gets all 100 events we requested. receiveCtx, cancel := context.WithTimeout(context.TODO(), time.Minute) defer cancel() events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil) if err != nil { panic(err) } for _, evt := range events { fmt.Printf("Body: %s\n", string(evt.Body)) } }
Output:
type ConsumerClientOptions ¶
type ConsumerClientOptions struct { // ApplicationID is used as the identifier when setting the User-Agent property. ApplicationID string // A custom endpoint address that can be used when establishing the connection to the service. CustomEndpoint string // InstanceID is a unique name used to identify the consumer. This can help with // diagnostics as this name will be returned in error messages. By default, // an identifier will be automatically generated. InstanceID string // NewWebSocketConn is a function that can create a net.Conn for use with websockets. // For an example, see ExampleNewClient_usingWebsockets() function in example_client_test.go. NewWebSocketConn func(ctx context.Context, args WebSocketConnParams) (net.Conn, error) // RetryOptions controls how often operations are retried from this client and any // Receivers and Senders created from this client. RetryOptions RetryOptions // TLSConfig configures a client with a custom *tls.Config. TLSConfig *tls.Config }
ConsumerClientOptions configures optional parameters for a ConsumerClient.
type Error ¶ added in v0.1.1
Error represents an Event Hub specific error. NOTE: the Code is considered part of the published API but the message that comes back from Error(), as well as the underlying wrapped error, are NOT and are subject to change.
type ErrorCode ¶ added in v0.2.0
ErrorCode is an error code, usable by consuming code to work with programatically.
const ( // a particular entity, or have expired. ErrorCodeUnauthorizedAccess ErrorCode = exported.ErrorCodeUnauthorizedAccess // ErrorCodeConnectionLost means our connection was lost and all retry attempts failed. // This typically reflects an extended outage or connection disruption and may // require manual intervention. ErrorCodeConnectionLost ErrorCode = exported.ErrorCodeConnectionLost // ErrorCodeOwnershipLost means that a partition that you were reading from was opened // by another link with a higher epoch/owner level. ErrorCodeOwnershipLost ErrorCode = exported.ErrorCodeOwnershipLost )
type EventData ¶
type EventData struct { // Properties can be used to store custom metadata for a message. Properties map[string]any // Body is the payload for a message. Body []byte // ContentType describes the payload of the message, with a descriptor following // the format of Content-Type, specified by RFC2045 (ex: "application/json"). ContentType *string // CorrelationID is a client-specific id that can be used to mark or identify messages // between clients. // CorrelationID can be a uint64, UUID, []byte, or string CorrelationID any // MessageID is an application-defined value that uniquely identifies // the message and its payload. The identifier is a free-form string. // // If enabled, the duplicate detection feature identifies and removes further submissions // of messages with the same MessageId. MessageID *string }
EventData is an event that can be sent, using the ProducerClient, to an Event Hub.
type EventDataBatch ¶
type EventDataBatch struct {
// contains filtered or unexported fields
}
EventDataBatch is used to efficiently pack up EventData before sending it to Event Hubs.
EventDataBatch's are not meant to be created directly. Use ProducerClient.NewEventDataBatch, which will create them with the proper size limit for your Event Hub.
func (*EventDataBatch) AddAMQPAnnotatedMessage ¶ added in v0.2.0
func (b *EventDataBatch) AddAMQPAnnotatedMessage(annotatedMessage *AMQPAnnotatedMessage, options *AddEventDataOptions) error
AddAMQPAnnotatedMessage adds an AMQPAnnotatedMessage to the batch, failing if the AMQPAnnotatedMessage would cause the EventDataBatch to be too large to send.
This size limit was set when the EventDataBatch was created, in options to ProducerClient.NewEventDataBatch, or (by default) from Event Hubs itself.
Returns ErrMessageTooLarge if the message cannot fit, or a non-nil error for other failures.
func (*EventDataBatch) AddEventData ¶
func (b *EventDataBatch) AddEventData(ed *EventData, options *AddEventDataOptions) error
AddEventData adds an EventData to the batch, failing if the EventData would cause the EventDataBatch to be too large to send.
This size limit was set when the EventDataBatch was created, in options to ProducerClient.NewEventDataBatch, or (by default) from Event Hubs itself.
Returns ErrMessageTooLarge if the event cannot fit, or a non-nil error for other failures.
Example ¶
package main import ( "context" "errors" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" ) var producerClient *azeventhubs.ProducerClient func main() { batch, err := producerClient.NewEventDataBatch(context.TODO(), nil) if err != nil { panic(err) } // can be called multiple times with new messages until you // receive an azeventhubs.ErrMessageTooLarge err = batch.AddEventData(&azeventhubs.EventData{ Body: []byte("hello"), }, nil) if errors.Is(err, azeventhubs.ErrEventDataTooLarge) { // Message was too large to fit into this batch. // // At this point you'd usually just send the batch (using ProducerClient.SendEventDataBatch), // create a new one, and start filling up the batch again. // // If this is the _only_ message being added to the batch then it's too big in general, and // will need to be split or shrunk to fit. panic(err) } else if err != nil { panic(err) } err = producerClient.SendEventDataBatch(context.TODO(), batch, nil) if err != nil { panic(err) } }
Output:
Example (RawAMQPMessages) ¶
package main import ( "context" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" ) var producerClient *azeventhubs.ProducerClient func main() { batch, err := producerClient.NewEventDataBatch(context.TODO(), nil) if err != nil { panic(err) } // This is functionally equivalent to EventDataBatch.AddEventData(), just with a more // advanced message format. // See ExampleEventDataBatch_AddEventData for more details. err = batch.AddAMQPAnnotatedMessage(&azeventhubs.AMQPAnnotatedMessage{ Body: azeventhubs.AMQPAnnotatedMessageBody{ Data: [][]byte{ []byte("hello"), []byte("world"), }, }, }, nil) if err != nil { panic(err) } err = batch.AddAMQPAnnotatedMessage(&azeventhubs.AMQPAnnotatedMessage{ Body: azeventhubs.AMQPAnnotatedMessageBody{ Sequence: [][]any{ // let the AMQP stack encode your strings (or other primitives) for you, no need // to convert them to bytes manually. {"hello", "world"}, {"howdy", "world"}, }, }, }, nil) if err != nil { panic(err) } err = batch.AddAMQPAnnotatedMessage(&azeventhubs.AMQPAnnotatedMessage{ Body: azeventhubs.AMQPAnnotatedMessageBody{ // let the AMQP stack encode your string (or other primitives) for you, no need // to convert them to bytes manually. Value: "hello world", }, }, nil) if err != nil { panic(err) } err = producerClient.SendEventDataBatch(context.TODO(), batch, nil) if err != nil { panic(err) } }
Output:
func (*EventDataBatch) NumBytes ¶
func (b *EventDataBatch) NumBytes() uint64
NumBytes is the number of bytes in the batch.
func (*EventDataBatch) NumEvents ¶ added in v0.2.0
func (b *EventDataBatch) NumEvents() int32
NumEvents returns the number of events in the batch.
type EventDataBatchOptions ¶ added in v0.2.0
type EventDataBatchOptions struct { // MaxBytes overrides the max size (in bytes) for a batch. // By default NewEventDataBatch will use the max message size provided by the service. MaxBytes uint64 // PartitionKey is hashed to calculate the partition assignment. Messages and message // batches with the same PartitionKey are guaranteed to end up in the same partition. // Note that if you use this option then PartitionID cannot be set. PartitionKey *string // PartitionID is the ID of the partition to send these messages to. // Note that if you use this option then PartitionKey cannot be set. PartitionID *string }
EventDataBatchOptions contains optional parameters for the ProducerClient.NewEventDataBatch function.
If both PartitionKey and PartitionID are nil, Event Hubs will choose an arbitrary partition for any events in this EventDataBatch.
type EventHubProperties ¶
EventHubProperties represents properties of the Event Hub, like the number of partitions.
type GetEventHubPropertiesOptions ¶
type GetEventHubPropertiesOptions struct { }
GetEventHubPropertiesOptions contains optional parameters for the GetEventHubProperties function
type GetPartitionPropertiesOptions ¶
type GetPartitionPropertiesOptions struct { }
GetPartitionPropertiesOptions are the options for the GetPartitionProperties function.
type ListCheckpointsOptions ¶ added in v0.1.1
type ListCheckpointsOptions struct { }
ListCheckpointsOptions contains optional parameters for the ListCheckpoints function
type ListOwnershipOptions ¶ added in v0.1.1
type ListOwnershipOptions struct { }
ListOwnershipOptions contains optional parameters for the ListOwnership function
type Ownership ¶ added in v0.1.1
type Ownership struct { ConsumerGroup string EventHubName string FullyQualifiedNamespace string PartitionID string OwnerID string // the owner ID of the Processor LastModifiedTime time.Time // used when calculating if ownership has expired ETag *azcore.ETag // the ETag, used when attempting to claim or update ownership of a partition. }
Ownership tracks which consumer owns a particular partition.
type PartitionClient ¶ added in v0.1.1
type PartitionClient struct {
// contains filtered or unexported fields
}
PartitionClient is used to receive events from an Event Hub partition.
This type is instantiated from the ConsumerClient type, using ConsumerClient.NewPartitionClient.
func (*PartitionClient) Close ¶ added in v0.1.1
func (pc *PartitionClient) Close(ctx context.Context) error
Close releases resources for this client.
func (*PartitionClient) ReceiveEvents ¶ added in v0.1.1
func (pc *PartitionClient) ReceiveEvents(ctx context.Context, count int, options *ReceiveEventsOptions) ([]*ReceivedEventData, error)
ReceiveEvents receives events until 'count' events have been received or the context has expired or been cancelled.
If your ReceiveEvents call appears to be stuck there are some common causes:
The PartitionClientOptions.StartPosition defaults to "Latest" when the client is created. The connection is lazily initialized, so it's possible the link was initialized to a position after events you've sent. To make this deterministic, you can choose an explicit start point using sequence number, offset or a timestamp. See the [PartitionClientOptions.StartPosition] field for more details.
You might have sent the events to a different partition than intended. By default, batches that are created using ProducerClient.NewEventDataBatch do not target a specific partition. When a partition is not specified, Azure Event Hubs service will choose the partition the events will be sent to.
To fix this, you can specify a PartitionID as part of your [EventDataBatchOptions.PartitionID] options or open multiple PartitionClient instances, one for each partition. You can get the full list of partitions at runtime using ConsumerClient.GetEventHubProperties. See the "example_consuming_events_test.go" for an example of this pattern.
Network issues can cause internal retries. To see log messages related to this use the instructions in the example function "Example_enableLogging".
type PartitionClientOptions ¶ added in v0.2.0
type PartitionClientOptions struct { // StartPosition is the position we will start receiving events from, // either an offset (inclusive) with Offset, or receiving events received // after a specific time using EnqueuedTime. // // NOTE: you can also use the [Processor], which will automatically manage the start // value using a [CheckpointStore]. See [example_consuming_with_checkpoints_test.go] for an // example. // // [example_consuming_with_checkpoints_test.go]: https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/example_consuming_with_checkpoints_test.go StartPosition StartPosition // OwnerLevel is the priority for this partition client, also known as the 'epoch' level. // When used, a partition client with a higher OwnerLevel will take ownership of a partition // from partition clients with a lower OwnerLevel. // Default is off. OwnerLevel *int64 // Prefetch represents the size of the internal prefetch buffer. When set, // this client will attempt to always maintain an internal cache of events of // this size, asynchronously, increasing the odds that ReceiveEvents() will use // a locally stored cache of events, rather than having to wait for events to // arrive from the network. // // Defaults to 300 events if Prefetch == 0. // Disabled if Prefetch < 0. Prefetch int32 }
PartitionClientOptions provides options for the NewPartitionClient function.
type PartitionProperties ¶
type PartitionProperties struct { // BeginningSequenceNumber is the first sequence number for a partition. BeginningSequenceNumber int64 // EventHubName is the name of the Event Hub for this partition. EventHubName string // IsEmpty is true if the partition is empty, false otherwise. IsEmpty bool // LastEnqueuedOffset is the offset of latest enqueued event. LastEnqueuedOffset int64 // LastEnqueuedOn is the date of latest enqueued event. LastEnqueuedOn time.Time // LastEnqueuedSequenceNumber is the sequence number of the latest enqueued event. LastEnqueuedSequenceNumber int64 // PartitionID is the partition ID of this partition. PartitionID string }
PartitionProperties are the properties for a single partition.
type Processor ¶ added in v0.1.1
type Processor struct {
// contains filtered or unexported fields
}
Processor uses a ConsumerClient and CheckpointStore to provide automatic load balancing between multiple Processor instances, even in separate processes or on separate machines.
See example_consuming_with_checkpoints_test.go for an example, and the function documentation for [Run] for a more detailed description of how load balancing works.
func NewProcessor ¶ added in v0.1.1
func NewProcessor(consumerClient *ConsumerClient, checkpointStore CheckpointStore, options *ProcessorOptions) (*Processor, error)
NewProcessor creates a Processor.
More information can be found in the documentation for the Processor type or the example_consuming_with_checkpoints_test.go for an example.
func (*Processor) NextPartitionClient ¶ added in v0.1.1
func (p *Processor) NextPartitionClient(ctx context.Context) *ProcessorPartitionClient
NextPartitionClient will get the next owned ProcessorPartitionClient if one is acquired or will block until a new one arrives or Processor.Run is cancelled. When the Processor stops running this function will return nil.
NOTE: You MUST call ProcessorPartitionClient.Close on the returned client to avoid leaking resources.
See example_consuming_with_checkpoints_test.go for an example of typical usage.
func (*Processor) Run ¶ added in v0.1.1
Run handles the load balancing loop, blocking until the passed in context is cancelled or it encounters an unrecoverable error. On cancellation, it will return a nil error.
This function should run for the lifetime of your application, or for as long as you want to continue to claim and process partitions.
Once a Processor has been stopped it cannot be restarted and a new instance must be created.
As partitions are claimed new ProcessorPartitionClient instances will be returned from Processor.NextPartitionClient. This can happen at any time, based on new Processor instances coming online, as well as other Processors exiting.
ProcessorPartitionClient are used like a PartitionClient but provide an ProcessorPartitionClient.UpdateCheckpoint function that will store a checkpoint into the CheckpointStore. If the client were to crash, or be restarted it will pick up from the last checkpoint.
See example_consuming_with_checkpoints_test.go for an example of typical usage.
type ProcessorOptions ¶ added in v0.2.0
type ProcessorOptions struct { // LoadBalancingStrategy dictates how concurrent Processor instances distribute // ownership of partitions between them. // The default strategy is ProcessorStrategyBalanced. LoadBalancingStrategy ProcessorStrategy // UpdateInterval controls how often attempt to claim partitions. // The default value is 10 seconds. UpdateInterval time.Duration // PartitionExpirationDuration is the amount of time before a partition is considered // unowned. // The default value is 60 seconds. PartitionExpirationDuration time.Duration // StartPositions are the default start positions (configurable per partition, or with an overall // default value) if a checkpoint is not found in the CheckpointStore. // The default position is Latest. StartPositions StartPositions // Prefetch represents the size of the internal prefetch buffer for each ProcessorPartitionClient // created by this Processor. When set, this client will attempt to always maintain // an internal cache of events of this size, asynchronously, increasing the odds that // ReceiveEvents() will use a locally stored cache of events, rather than having to // wait for events to arrive from the network. // // Defaults to 300 events if Prefetch == 0. // Disabled if Prefetch < 0. Prefetch int32 }
ProcessorOptions are the options for the NewProcessor function.
type ProcessorPartitionClient ¶ added in v0.1.1
type ProcessorPartitionClient struct {
// contains filtered or unexported fields
}
ProcessorPartitionClient allows you to receive events, similar to a PartitionClient, with a checkpoint store for tracking progress.
This type is instantiated from Processor.NextPartitionClient, which handles load balancing of partition ownership between multiple Processor instances.
See example_consuming_with_checkpoints_test.go for an example.
NOTE: If you do NOT want to use dynamic load balancing, and would prefer to track state and ownership manually, use the ConsumerClient instead.
func (*ProcessorPartitionClient) Close ¶ added in v0.1.1
func (c *ProcessorPartitionClient) Close(ctx context.Context) error
Close releases resources for the partition client. This does not close the ConsumerClient that the Processor was started with.
func (*ProcessorPartitionClient) PartitionID ¶ added in v0.1.1
func (p *ProcessorPartitionClient) PartitionID() string
PartitionID is the partition ID of the partition we're receiving from. This will not change during the lifetime of this ProcessorPartitionClient.
func (*ProcessorPartitionClient) ReceiveEvents ¶ added in v0.1.1
func (c *ProcessorPartitionClient) ReceiveEvents(ctx context.Context, count int, options *ReceiveEventsOptions) ([]*ReceivedEventData, error)
ReceiveEvents receives events until 'count' events have been received or the context has been cancelled.
See PartitionClient.ReceiveEvents for more information, including troubleshooting.
func (*ProcessorPartitionClient) UpdateCheckpoint ¶ added in v0.1.1
func (p *ProcessorPartitionClient) UpdateCheckpoint(ctx context.Context, latestEvent *ReceivedEventData, options *UpdateCheckpointOptions) error
UpdateCheckpoint updates the checkpoint in the CheckpointStore. New Processors will resume after this checkpoint for this partition.
type ProcessorStrategy ¶ added in v0.1.1
type ProcessorStrategy string
ProcessorStrategy specifies the load balancing strategy used by the Processor.
const ( // ProcessorStrategyBalanced will attempt to claim a single partition at a time, until each active // owner has an equal share of partitions. // This is the default strategy. ProcessorStrategyBalanced ProcessorStrategy = "balanced" // ProcessorStrategyGreedy will attempt to claim as many partitions at a time as it can, ignoring // balance. ProcessorStrategyGreedy ProcessorStrategy = "greedy" )
type ProducerClient ¶
type ProducerClient struct {
// contains filtered or unexported fields
}
ProducerClient can be used to send events to an Event Hub.
func NewProducerClient ¶
func NewProducerClient(fullyQualifiedNamespace string, eventHub string, credential azcore.TokenCredential, options *ProducerClientOptions) (*ProducerClient, error)
NewProducerClient creates a ProducerClient which uses an azcore.TokenCredential for authentication. You MUST call ProducerClient.Close on this client to avoid leaking resources.
The fullyQualifiedNamespace is the Event Hubs namespace name (ex: myeventhub.servicebus.windows.net) The credential is one of the credentials in the azidentity package.
Example ¶
package main import ( "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" ) var producerClient *azeventhubs.ProducerClient func main() { // `DefaultAzureCredential` tries several common credential types. For more credential types // see this link: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity#readme-credential-types. defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil) if err != nil { panic(err) } producerClient, err = azeventhubs.NewProducerClient("<ex: myeventhubnamespace.servicebus.windows.net>", "eventhub-name", defaultAzureCred, nil) if err != nil { panic(err) } }
Output:
Example (UsingCustomEndpoint) ¶
package main import ( "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" ) var producerClient *azeventhubs.ProducerClient func main() { // `DefaultAzureCredential` tries several common credential types. For more credential types // see this link: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity#readme-credential-types. defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil) if err != nil { panic(err) } producerClient, err = azeventhubs.NewProducerClient("<ex: myeventhubnamespace.servicebus.windows.net>", "eventhub-name", defaultAzureCred, &azeventhubs.ProducerClientOptions{ // A custom endpoint can be used when you need to connect to a TCP proxy. CustomEndpoint: "<address/hostname of TCP proxy>", }) if err != nil { panic(err) } }
Output:
func NewProducerClientFromConnectionString ¶
func NewProducerClientFromConnectionString(connectionString string, eventHub string, options *ProducerClientOptions) (*ProducerClient, error)
NewProducerClientFromConnectionString creates a ProducerClient from a connection string. You MUST call ProducerClient.Close on this client to avoid leaking resources.
connectionString can be one of two formats - with or without an EntityPath key.
When the connection string does not have an entity path, as shown below, the eventHub parameter cannot be empty and should contain the name of your event hub.
Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>
When the connection string DOES have an entity path, as shown below, the eventHub parameter must be empty.
Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<entity path>;
Example ¶
// if the connection string contains an EntityPath // connectionString := "Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<entity path>" producerClient, err = azeventhubs.NewProducerClientFromConnectionString(connectionString, "", nil) // or // if the connection string does not contain an EntityPath connectionString = "Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>" producerClient, err = azeventhubs.NewProducerClientFromConnectionString(connectionString, "eventhub-name", nil) if err != nil { panic(err) }
Output:
func (*ProducerClient) Close ¶
func (pc *ProducerClient) Close(ctx context.Context) error
Close releases resources for this client.
func (*ProducerClient) GetEventHubProperties ¶
func (pc *ProducerClient) GetEventHubProperties(ctx context.Context, options *GetEventHubPropertiesOptions) (EventHubProperties, error)
GetEventHubProperties gets event hub properties, like the available partition IDs and when the Event Hub was created.
Example ¶
package main import ( "context" "fmt" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" ) var producerClient *azeventhubs.ProducerClient func main() { eventHubProps, err := producerClient.GetEventHubProperties(context.TODO(), nil) if err != nil { panic(err) } for _, partitionID := range eventHubProps.PartitionIDs { fmt.Printf("Partition ID: %s\n", partitionID) } }
Output:
func (*ProducerClient) GetPartitionProperties ¶
func (pc *ProducerClient) GetPartitionProperties(ctx context.Context, partitionID string, options *GetPartitionPropertiesOptions) (PartitionProperties, error)
GetPartitionProperties gets properties for a specific partition. This includes data like the last enqueued sequence number, the first sequence number and when an event was last enqueued to the partition.
Example ¶
package main import ( "context" "fmt" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" ) var producerClient *azeventhubs.ProducerClient func main() { partitionProps, err := producerClient.GetPartitionProperties(context.TODO(), "partition-id", nil) if err != nil { panic(err) } fmt.Printf("First sequence number for partition ID %s: %d\n", partitionProps.PartitionID, partitionProps.BeginningSequenceNumber) fmt.Printf("Last sequence number for partition ID %s: %d\n", partitionProps.PartitionID, partitionProps.LastEnqueuedSequenceNumber) }
Output:
func (*ProducerClient) NewEventDataBatch ¶
func (pc *ProducerClient) NewEventDataBatch(ctx context.Context, options *EventDataBatchOptions) (*EventDataBatch, error)
NewEventDataBatch can be used to create an EventDataBatch, which can contain multiple events.
EventDataBatch contains logic to make sure that the it doesn't exceed the maximum size for the Event Hubs link, using it's azeventhubs.EventDataBatch.AddEventData function. A lower size limit can also be configured through the options.
NOTE: if options is nil or empty, Event Hubs will choose an arbitrary partition for any events in this EventDataBatch.
If the operation fails it can return an azeventhubs.Error type if the failure is actionable.
func (*ProducerClient) SendEventDataBatch ¶ added in v0.3.0
func (pc *ProducerClient) SendEventDataBatch(ctx context.Context, batch *EventDataBatch, options *SendEventDataBatchOptions) error
SendEventDataBatch sends an event data batch to Event Hubs.
Example ¶
package main import ( "context" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" ) var producerClient *azeventhubs.ProducerClient func main() { batch, err := producerClient.NewEventDataBatch(context.TODO(), nil) if err != nil { panic(err) } // See ExampleProducerClient_AddEventData for more information. err = batch.AddEventData(&azeventhubs.EventData{Body: []byte("hello")}, nil) if err != nil { panic(err) } err = producerClient.SendEventDataBatch(context.TODO(), batch, nil) if err != nil { panic(err) } }
Output:
type ProducerClientOptions ¶
type ProducerClientOptions struct { // Application ID that will be passed to the namespace. ApplicationID string // A custom endpoint address that can be used when establishing the connection to the service. CustomEndpoint string // NewWebSocketConn is a function that can create a net.Conn for use with websockets. // For an example, see ExampleNewClient_usingWebsockets() function in example_client_test.go. NewWebSocketConn func(ctx context.Context, params WebSocketConnParams) (net.Conn, error) // RetryOptions controls how often operations are retried from this client and any // Receivers and Senders created from this client. RetryOptions RetryOptions // TLSConfig configures a client with a custom *tls.Config. TLSConfig *tls.Config }
ProducerClientOptions contains options for the `NewProducerClient` and `NewProducerClientFromConnectionString` functions.
type ReceiveEventsOptions ¶
type ReceiveEventsOptions struct { }
ReceiveEventsOptions contains optional parameters for the ReceiveEvents function
type ReceivedEventData ¶
type ReceivedEventData struct { EventData // EnqueuedTime is the UTC time when the message was accepted and stored by Event Hubs. EnqueuedTime *time.Time // PartitionKey is used with a partitioned entity and enables assigning related messages // to the same internal partition. This ensures that the submission sequence order is correctly // recorded. The partition is chosen by a hash function in Event Hubs and cannot be chosen // directly. PartitionKey *string // Offset is the offset of the event. Offset int64 // RawAMQPMessage is the AMQP message, as received by the client. This can be useful to get access // to properties that are not exposed by ReceivedEventData such as payloads encoded into the // Value or Sequence section, payloads sent as multiple Data sections, as well as Footer // and Header fields. RawAMQPMessage *AMQPAnnotatedMessage // SequenceNumber is a unique number assigned to a message by Event Hubs. SequenceNumber int64 // Properties set by the Event Hubs service. SystemProperties map[string]any }
ReceivedEventData is an event that has been received using the ConsumerClient.
type RetryOptions ¶
type RetryOptions = exported.RetryOptions
RetryOptions represent the options for retries.
type SendEventDataBatchOptions ¶ added in v0.3.0
type SendEventDataBatchOptions struct { }
SendEventDataBatchOptions contains optional parameters for the SendEventDataBatch function
type SetCheckpointOptions ¶ added in v1.0.0
type SetCheckpointOptions struct { }
SetCheckpointOptions contains optional parameters for the UpdateCheckpoint function
type StartPosition ¶
type StartPosition struct { // Offset will start the consumer after the specified offset. Can be exclusive // or inclusive, based on the Inclusive property. // NOTE: offsets are not stable values, and might refer to different events over time // as the Event Hub events reach their age limit and are discarded. Offset *int64 // SequenceNumber will start the consumer after the specified sequence number. Can be exclusive // or inclusive, based on the Inclusive property. SequenceNumber *int64 // EnqueuedTime will start the consumer before events that were enqueued on or after EnqueuedTime. // Can be exclusive or inclusive, based on the Inclusive property. EnqueuedTime *time.Time // Inclusive configures whether the events directly at Offset, SequenceNumber or EnqueuedTime will be included (true) // or excluded (false). Inclusive bool // Earliest will start the consumer at the earliest event. Earliest *bool // Latest will start the consumer after the last event. Latest *bool }
StartPosition indicates the position to start receiving events within a partition. The default position is Latest.
You can set this in the options for ConsumerClient.NewPartitionClient.
type StartPositions ¶ added in v0.1.1
type StartPositions struct { // PerPartition controls the start position for a specific partition, // by partition ID. If a partition is not configured here it will default // to Default start position. PerPartition map[string]StartPosition // Default is used if the partition is not found in the PerPartition map. Default StartPosition }
StartPositions are used if there is no checkpoint for a partition in the checkpoint store.
type UpdateCheckpointOptions ¶ added in v0.1.1
type UpdateCheckpointOptions struct { }
UpdateCheckpointOptions contains optional parameters for the ProcessorPartitionClient.UpdateCheckpoint function.
type WebSocketConnParams ¶ added in v0.2.0
type WebSocketConnParams = exported.WebSocketConnParams
WebSocketConnParams are passed to your web socket creation function (ClientOptions.NewWebSocketConn)
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
amqpwrap
Package amqpwrap has some simple wrappers to make it easier to abstract the go-amqp types.
|
Package amqpwrap has some simple wrappers to make it easier to abstract the go-amqp types. |
auth
Package auth provides an abstraction over claims-based security for Azure Event Hub and Service Bus.
|
Package auth provides an abstraction over claims-based security for Azure Event Hub and Service Bus. |
mock
Package mock is a generated GoMock package.
|
Package mock is a generated GoMock package. |