README
Microsoft Azure Event Hubs Client for Golang
Azure Event Hubs is a highly scalable publish-subscribe service that can ingest millions of events per second and stream them into multiple applications. This lets you process and analyze the massive amounts of data produced by your connected devices and applications. Once Event Hubs has collected the data, you can retrieve, transform and store it by using any real-time analytics provider or with batching/storage adapters.
Refer to the online documentation to learn more about Event Hubs in general.
This library is a pure Golang implementation of Azure Event Hubs over AMQP.
Installing the library
Use go get
to acquire and install from source. Versions of the project after 1.0.1 use Go modules exclusively, which
means you'll need Go 1.11 or later to ensure all of the dependencies are properly versioned.
For more information on modules, see the Go modules wiki.
go get -u github.com/Azure/azure-event-hubs-go/...
Using Event Hubs
In this section we'll cover some basics of the library to help you get started.
This library has two main dependencies, vcabbage/amqp and Azure AMQP Common. The former provides the AMQP protocol implementation and the latter provides some common authentication, persistence and request-response message flows.
Quick start
Let's send and receive "hello, world!"
.
package main
import (
"context"
"fmt"
"os"
"os/signal"
"time"
"github.com/Azure/azure-event-hubs-go"
)
func main() {
connStr := "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName"
hub, err := eventhub.NewHubFromConnectionString(connStr)
if err != nil {
// handle err
}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
// send a single message into a random partition
err = hub.Send(ctx, eventhub.NewEventFromString("hello, world!"))
if err != nil {
// handle error
}
handler := func(c context.Context, event *eventhub.Event) error {
fmt.Println(string(event.Data))
return nil
}
// listen to each partition of the Event Hub
runtimeInfo, err := hub.GetRuntimeInformation(ctx)
if err != nil {
// handle err
}
for _, partitionID := range runtimeInfo.PartitionIDs {
// Start receiving messages
//
// Receive blocks while attempting to connect to hub, then runs until listenerHandle.Close() is called
// <- listenerHandle.Done() signals listener has died
// listenerHandle.Err() provides the last error the receiver encountered
listenerHandle, err := hub.Receive(ctx, partitionID, handler, eventhub.ReceiveWithLatestOffset())
if err != nil {
// handle err
}
}
// Wait for a signal to quit:
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, os.Kill)
<-signalChan
hub.Close(context.Background())
}
Environment Variables
In the above example, the Hub
instance was created using environment variables. Here is a list of environment
variables used in this project.
Event Hub env vars
EVENTHUB_NAMESPACE
the namespace of the Event Hub instanceEVENTHUB_NAME
the name of the Event Hub instance
SAS TokenProvider environment variables:
There are two sets of environment variables which can produce a SAS TokenProvider
-
Expected Environment Variables:
EVENTHUB_KEY_NAME
the name of the Event Hub keyEVENTHUB_KEY_VALUE
the secret for the Event Hub key named inEVENTHUB_KEY_NAME
-
Expected Environment Variable:
EVENTHUB_CONNECTION_STRING
connection string from the Azure portal like:Endpoint=sb://foo.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=fluffypuppy;EntityPath=hubName
AAD TokenProvider environment variables:
- Client Credentials: attempt to authenticate with a Service Principal via
AZURE_TENANT_ID
the Azure Tenant IDAZURE_CLIENT_ID
the Azure Application IDAZURE_CLIENT_SECRET
a key / secret for the corresponding application
- Client Certificate: attempt to authenticate with a Service Principal via
AZURE_TENANT_ID
the Azure Tenant IDAZURE_CLIENT_ID
the Azure Application IDAZURE_CERTIFICATE_PATH
the path to the certificate fileAZURE_CERTIFICATE_PASSWORD
the password for the certificate
The Azure Environment used can be specified using the name of the Azure Environment set in "AZURE_ENVIRONMENT" var.
Authentication
Event Hubs offers a couple different paths for authentication, shared access signatures (SAS) and Azure Active Directory (AAD)
JWT authentication. Both token types are available for use and are exposed through the TokenProvider
interface.
// TokenProvider abstracts the fetching of authentication tokens
TokenProvider interface {
GetToken(uri string) (*Token, error)
}
SAS token provider
The SAS token provider uses the namespace of the Event Hub, the name of the "Shared access policy" key and the value of the key to produce a token.
You can create new Shared access policies through the Azure portal as shown below.
You can create a SAS token provider in a couple different ways. You can build one with a key name and key value like this.
provider := sas.TokenProviderWithKey("myKeyName", "myKeyValue")
Or, you can create a token provider from environment variables like this.
// TokenProviderWithEnvironmentVars creates a new SAS TokenProvider from environment variables
//
// There are two sets of environment variables which can produce a SAS TokenProvider
//
// 1) Expected Environment Variables:
// - "EVENTHUB_KEY_NAME" the name of the Event Hub key
// - "EVENTHUB_KEY_VALUE" the secret for the Event Hub key named in "EVENTHUB_KEY_NAME"
//
// 2) Expected Environment Variable:
// - "EVENTHUB_CONNECTION_STRING" connection string from the Azure portal
provider, err := sas.NewTokenProvider(sas.TokenProviderWithEnvironmentVars())
AAD JWT token provider
The AAD JWT token provider uses Azure Active Directory to authenticate the service and acquire a token (JWT) which is
used to authenticate with Event Hubs. The authenticated identity must have Contributor
role based authorization for
the Event Hub instance. This article
provides more information about this preview feature.
The easiest way to create a JWT token provider is via environment variables.
// 1. Client Credentials: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID" and
// "AZURE_CLIENT_SECRET"
//
// 2. Client Certificate: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID",
// "AZURE_CERTIFICATE_PATH" and "AZURE_CERTIFICATE_PASSWORD"
//
// 3. Managed Service Identity (MSI): attempt to authenticate via MSI
//
//
// The Azure Environment used can be specified using the name of the Azure Environment set in "AZURE_ENVIRONMENT" var.
provider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars())
You can also provide your own adal.ServicePrincipalToken
.
config := &aad.TokenProviderConfiguration{
ResourceURI: azure.PublicCloud.ResourceManagerEndpoint,
Env: &azure.PublicCloud,
}
spToken, err := config.NewServicePrincipalToken()
if err != nil {
// handle err
}
provider, err := aad.NewJWTProvider(aad.JWTProviderWithAADToken(aadToken))
Send And Receive
The basics of messaging are sending and receiving messages. Here are the different ways you can do that.
Sending to a particular partition
By default, a Hub will send messages any of the load balanced partitions. Sometimes you want to send to only a particular partition. You can do this in two ways.
- You can supply a partition key on an event
event := eventhub.NewEventFromString("foo") event.PartitionKey = "bazz" hub.Send(ctx, event) // send event to the partition ID to which partition key hashes
- You can build a hub instance that will only send to one partition.
partitionID := "0" hub, err := eventhub.NewHubFromEnvironment(eventhub.HubWithPartitionedSender(partitionID))
Sending batches of events
Sending a batch of messages is more efficient than sending a single message.
batch := &EventBatch{
Events: []*eventhub.Event {
eventhub.NewEventFromString("one"),
eventhub.NewEventFromString("two"),
},
}
err := client.SendBatch(ctx, batch)
Receiving
When receiving messages from an Event Hub, you always need to specify the partition you'd like to receive from.
Hub.Receive
is a non-blocking call, which takes a message handler func and options. Since Event Hub is just a long
log of messages, you also have to tell it where to start from. By default, a receiver will start from the beginning
of the log, but there are options to help you specify your starting offset.
The Receive
func returns a handle to the running receiver and an error. If error is returned, the receiver was unable
to start. If error is nil, the receiver is running and can be stopped by calling Close
on the Hub
or the handle
returned.
- Receive messages from a partition from the beginning of the log
handle, err := hub.Receive(ctx, partitionID, func(ctx context.Context, event *eventhub.Event) error { // do stuff })
- Receive from the latest message onward
handle, err := hub.Receive(ctx, partitionID, handler, eventhub.ReceiveWithLatestOffset())
- Receive from a specified offset
handle, err := hub.Receive(ctx, partitionID, handler, eventhub.ReceiveWithStartingOffset(offset))
At some point, a receiver process is going to stop. You will likely want it to start back up at the spot that it stopped processing messages. This is where message offsets can be used to start from where you have left off.
The Hub
struct can be customized to use an persist.CheckpointPersister
. By default, a Hub
uses an in-memory
CheckpointPersister
, but accepts anything that implements the perist.CheckpointPersister
interface.
// CheckpointPersister provides persistence for the received offset for a given namespace, hub name, consumer group, partition Id and
// offset so that if a receiver where to be interrupted, it could resume after the last consumed event.
CheckpointPersister interface {
Write(namespace, name, consumerGroup, partitionID string, checkpoint Checkpoint) error
Read(namespace, name, consumerGroup, partitionID string) (Checkpoint, error)
}
For example, you could use the persist.FilePersister to save your checkpoints to a directory.
persister, err := persist.NewFilePersister(directoryPath)
if err != nil {
// handle err
}
hub, err := eventhub.NewHubFromEnvironment(eventhub.HubWithOffsetPersistence(persister))
Event Processor Host
The key to scale for Event Hubs is the idea of partitioned consumers. In contrast to the competing consumers pattern, the partitioned consumer pattern enables high scale by removing the contention bottleneck and facilitating end to end parallelism.
The Event Processor Host (EPH) is an intelligent consumer agent that simplifies the management of checkpointing, leasing, and parallel event readers. EPH is intended to be run across multiple processes and machines while load balancing message consumers. A message consumer in EPH will take a lease on a partition, begin processing messages and periodically write a check point to a persistent store. If at any time a new EPH process is added or lost, the remaining processors will balance the existing leases amongst the set of EPH processes.
The default implementation of partition leasing and check pointing is based on Azure Storage. Below is an example using EPH to start listening to all of the partitions of an Event Hub and print the messages received.
Receiving Events
package main
import (
"context"
"fmt"
"os"
"os/signal"
"time"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/Azure/azure-amqp-common-go/conn"
"github.com/Azure/azure-amqp-common-go/sas"
"github.com/Azure/azure-event-hubs-go/eph"
"github.com/Azure/azure-event-hubs-go"
"github.com/Azure/azure-event-hubs-go/storage"
"github.com/Azure/azure-storage-blob-go/2016-05-31/azblob"
)
func main() {
// Azure Storage account information
storageAccountName := "mystorageaccount"
storageAccountKey := "Zm9vCg=="
// Azure Storage container to store leases and checkpoints
storageContainerName := "ephcontainer"
// Azure Event Hub connection string
eventHubConnStr := "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName"
parsed, err := conn.ParsedConnectionFromStr(eventHubConnStr)
if err != nil {
// handle error
}
// create a new Azure Storage Leaser / Checkpointer
cred := azblob.NewSharedKeyCredential(storageAccountName, storageAccountKey)
leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(cred, storageAccountName, storageContainerName, azure.PublicCloud)
if err != nil {
// handle error
}
// SAS token provider for Azure Event Hubs
provider, err := sas.NewTokenProvider(sas.TokenProviderWithKey(parsed.KeyName, parsed.Key))
if err != nil {
// handle error
}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
// create a new EPH processor
processor, err := eph.New(ctx, parsed.Namespace, parsed.HubName, provider, leaserCheckpointer, leaserCheckpointer)
if err != nil {
// handle error
}
// register a message handler -- many can be registered
handlerID, err := processor.RegisterHandler(ctx,
func(c context.Context, event *eventhub.Event) error {
fmt.Println(string(event.Data))
return nil
})
if err != nil {
// handle error
}
// unregister a handler to stop that handler from receiving events
// processor.UnregisterHandler(ctx, handleID)
// start handling messages from all of the partitions balancing across multiple consumers
processor.StartNonBlocking(ctx)
// Wait for a signal to quit:
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, os.Kill)
<-signalChan
err = processor.Close(context.Background())
if err != nil {
// handle error
}
}
Examples
- HelloWorld: Producer and Consumer: an example of sending and receiving messages from an Event Hub instance.
- Batch Processing: an example of handling events in batches
Contributing
This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com.
When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.
This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.
See contributing.md.
License
MIT, see LICENSE.
Documentation
Overview ¶
Package eventhub provides functionality for interacting with Azure Event Hubs.
Index ¶
- Constants
- func ApplyComponentInfo(span *trace.Span)
- type BaseEntityDescription
- type Event
- type EventBatch
- type Handler
- type Hub
- func NewHub(namespace, name string, tokenProvider auth.TokenProvider, opts ...HubOption) (*Hub, error)
- func NewHubFromConnectionString(connStr string, opts ...HubOption) (*Hub, error)
- func NewHubFromEnvironment(opts ...HubOption) (*Hub, error)
- func NewHubWithNamespaceNameAndEnvironment(namespace, name string, opts ...HubOption) (*Hub, error)
- func (h *Hub) Close(ctx context.Context) error
- func (h *Hub) GetPartitionInformation(ctx context.Context, partitionID string) (*HubPartitionRuntimeInformation, error)
- func (h *Hub) GetRuntimeInformation(ctx context.Context) (*HubRuntimeInformation, error)
- func (h *Hub) Receive(ctx context.Context, partitionID string, handler Handler, ...) (*ListenerHandle, error)
- func (h *Hub) Send(ctx context.Context, event *Event, opts ...SendOption) error
- func (h *Hub) SendBatch(ctx context.Context, batch *EventBatch, opts ...SendOption) error
- type HubDescription
- type HubEntity
- type HubManagementOption
- type HubManager
- func (hm *HubManager) Delete(ctx context.Context, name string) error
- func (em HubManager) Execute(ctx context.Context, method string, entityPath string, body io.Reader) (*http.Response, error)
- func (hm *HubManager) Get(ctx context.Context, name string) (*HubEntity, error)
- func (hm *HubManager) List(ctx context.Context) ([]*HubEntity, error)
- func (em HubManager) Post(ctx context.Context, entityPath string, body []byte) (*http.Response, error)
- func (hm *HubManager) Put(ctx context.Context, name string, opts ...HubManagementOption) (*HubEntity, error)
- type HubOption
- type HubPartitionRuntimeInformation
- type HubRuntimeInformation
- type ListenerHandle
- type Manager
- type PartitionedReceiver
- type ReceiveOption
- func ReceiveFromTimestamp(t time.Time) ReceiveOption
- func ReceiveWithConsumerGroup(consumerGroup string) ReceiveOption
- func ReceiveWithEpoch(epoch int64) ReceiveOption
- func ReceiveWithLatestOffset() ReceiveOption
- func ReceiveWithPrefetchCount(prefetch uint32) ReceiveOption
- func ReceiveWithStartingOffset(offset string) ReceiveOption
- type SendOption
- type Sender
- type SystemProperties
Examples ¶
Constants ¶
const (
// DefaultConsumerGroup is the default name for a event stream consumer group
DefaultConsumerGroup = "$Default"
)
const (
// MsftVendor is the Microsoft vendor identifier
MsftVendor = "com.microsoft"
)
const (
// Version is the semantic version number
Version = "1.3.1"
)
Variables ¶
Functions ¶
func ApplyComponentInfo ¶
ApplyComponentInfo applies eventhub library and network info to the span
Types ¶
type BaseEntityDescription ¶
type BaseEntityDescription struct { InstanceMetadataSchema *string `xml:"xmlns:i,attr,omitempty"` ServiceBusSchema *string `xml:"xmlns,attr,omitempty"` }
BaseEntityDescription provides common fields which are part of Queues, Topics and Subscriptions
type Event ¶
type Event struct { Data []byte PartitionKey *string Properties map[string]interface{} ID string SystemProperties *SystemProperties // contains filtered or unexported fields }
Event is an Event Hubs message to be sent or received
func NewEventFromString ¶
NewEventFromString builds an Event from a string message
func (*Event) GetCheckpoint ¶
func (e *Event) GetCheckpoint() persist.Checkpoint
GetCheckpoint returns the checkpoint information on the Event
type EventBatch ¶
type EventBatch struct { Events []*Event PartitionKey *string Properties map[string]interface{} ID string }
EventBatch is a batch of Event Hubs messages to be sent
func NewEventBatch ¶
func NewEventBatch(events []*Event) *EventBatch
NewEventBatch builds an EventBatch from an array of Events
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
Hub provides the ability to send and receive Event Hub messages
Example (HelloWorld) ¶
Output: Hello World!
Example (WebSocket) ¶
Output: this message was sent and received via web socket!!
func NewHub ¶
func NewHub(namespace, name string, tokenProvider auth.TokenProvider, opts ...HubOption) (*Hub, error)
NewHub creates a new Event Hub client for sending and receiving messages
func NewHubFromConnectionString ¶
NewHubFromConnectionString creates a new Event Hub client for sending and receiving messages from a connection string formatted like the following:
Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName
func NewHubFromEnvironment ¶
NewHubFromEnvironment creates a new Event Hub client for sending and receiving messages from environment variables
Expected Environment Variables:
- "EVENTHUB_NAMESPACE" the namespace of the Event Hub instance - "EVENTHUB_NAME" the name of the Event Hub instance
This method depends on NewHubWithNamespaceNameAndEnvironment which will attempt to build a token provider from environment variables. If unable to build a AAD Token Provider it will fall back to a SAS token provider. If neither can be built, it will return error.
SAS TokenProvider environment variables:
There are two sets of environment variables which can produce a SAS TokenProvider
1) Expected Environment Variables: - "EVENTHUB_NAMESPACE" the namespace of the Event Hub instance - "EVENTHUB_KEY_NAME" the name of the Event Hub key - "EVENTHUB_KEY_VALUE" the secret for the Event Hub key named in "EVENTHUB_KEY_NAME" 2) Expected Environment Variable: - "EVENTHUB_CONNECTION_STRING" connection string from the Azure portal
AAD TokenProvider environment variables:
1. client Credentials: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID" and "AZURE_CLIENT_SECRET" 2. client Certificate: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID", "AZURE_CERTIFICATE_PATH" and "AZURE_CERTIFICATE_PASSWORD" 3. Managed Service Identity (MSI): attempt to authenticate via MSI
The Azure Environment used can be specified using the name of the Azure Environment set in the AZURE_ENVIRONMENT var.
func NewHubWithNamespaceNameAndEnvironment ¶
NewHubWithNamespaceNameAndEnvironment creates a new Event Hub client for sending and receiving messages from environment variables with supplied namespace and name which will attempt to build a token provider from environment variables. If unable to build a AAD Token Provider it will fall back to a SAS token provider. If neither can be built, it will return error.
SAS TokenProvider environment variables:
There are two sets of environment variables which can produce a SAS TokenProvider
1) Expected Environment Variables: - "EVENTHUB_KEY_NAME" the name of the Event Hub key - "EVENTHUB_KEY_VALUE" the secret for the Event Hub key named in "EVENTHUB_KEY_NAME" 2) Expected Environment Variable: - "EVENTHUB_CONNECTION_STRING" connection string from the Azure portal
AAD TokenProvider environment variables:
1. client Credentials: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID" and "AZURE_CLIENT_SECRET" 2. client Certificate: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID", "AZURE_CERTIFICATE_PATH" and "AZURE_CERTIFICATE_PASSWORD" 3. Managed Service Identity (MSI): attempt to authenticate via MSI on the default local MSI internally addressable IP and port. See: adal.GetMSIVMEndpoint()
The Azure Environment used can be specified using the name of the Azure Environment set in the AZURE_ENVIRONMENT var.
func (*Hub) GetPartitionInformation ¶
func (h *Hub) GetPartitionInformation(ctx context.Context, partitionID string) (*HubPartitionRuntimeInformation, error)
GetPartitionInformation fetches runtime information about a specific partition from the Event Hub management node
func (*Hub) GetRuntimeInformation ¶
func (h *Hub) GetRuntimeInformation(ctx context.Context) (*HubRuntimeInformation, error)
GetRuntimeInformation fetches runtime information from the Event Hub management node
func (*Hub) Receive ¶
func (h *Hub) Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) (*ListenerHandle, error)
Receive subscribes for messages sent to the provided entityPath.
The context passed into Receive is only used to limit the amount of time the caller will wait for the Receive method to connect to the Event Hub. The context passed in does not control the lifetime of Receive after connection.
If Receive encounters an initial error setting up the connection, an error will be returned.
If Receive starts successfully, a *ListenerHandle and a nil error will be returned. The ListenerHandle exposes methods which will help manage the life span of the receiver.
ListenerHandle.Close(ctx) closes the receiver
ListenerHandle.Done() signals the consumer when the receiver has stopped
ListenerHandle.Err() provides the last error the listener encountered and was unable to recover from
func (*Hub) Send ¶
Send sends an event to the Event Hub
Send will retry sending the message for as long as the context allows
func (*Hub) SendBatch ¶
func (h *Hub) SendBatch(ctx context.Context, batch *EventBatch, opts ...SendOption) error
SendBatch sends an EventBatch to the Event Hub
SendBatch will retry sending the message for as long as the context allows
type HubDescription ¶
type HubDescription struct { XMLName xml.Name `xml:"EventHubDescription"` MessageRetentionInDays *int32 `xml:"MessageRetentionInDays,omitempty"` SizeInBytes *int64 `xml:"SizeInBytes,omitempty"` Status *eventhub.EntityStatus `xml:"Status,omitempty"` CreatedAt *date.Time `xml:"CreatedAt,omitempty"` UpdatedAt *date.Time `xml:"UpdatedAt,omitempty"` PartitionCount *int32 `xml:"PartitionCount,omitempty"` PartitionIDs *[]string `xml:"PartitionIds>string,omitempty"` EntityAvailabilityStatus *string `xml:"EntityAvailabilityStatus,omitempty"` BaseEntityDescription }
HubDescription is the content type for Event Hub management requests
type HubEntity ¶
type HubEntity struct { *HubDescription Name string }
HubEntity is the Azure Event Hub description of a Hub for management activities
type HubManagementOption ¶
type HubManagementOption func(description *HubDescription) error
HubManagementOption provides structure for configuring new Event Hubs
func HubWithMessageRetentionInDays ¶
func HubWithMessageRetentionInDays(days int32) HubManagementOption
HubWithMessageRetentionInDays configures an Event Hub to retain messages for that number of days
func HubWithPartitionCount ¶
func HubWithPartitionCount(count int32) HubManagementOption
HubWithPartitionCount configures an Event Hub to have the specified number of partitions. More partitions == more throughput
type HubManager ¶
type HubManager struct {
// contains filtered or unexported fields
}
HubManager provides CRUD functionality for Event Hubs
func NewHubManagerFromAzureEnvironment ¶
func NewHubManagerFromAzureEnvironment(namespace string, tokenProvider auth.TokenProvider, env azure.Environment) (*HubManager, error)
NewHubManagerFromAzureEnvironment builds a HubManager from a Event Hub name, SAS or AAD token provider and Azure Environment
func NewHubManagerFromConnectionString ¶
func NewHubManagerFromConnectionString(connStr string) (*HubManager, error)
NewHubManagerFromConnectionString builds a HubManager from an Event Hub connection string
func (*HubManager) Delete ¶
func (hm *HubManager) Delete(ctx context.Context, name string) error
Delete deletes an Event Hub entity by name
func (HubManager) Execute ¶
func (em HubManager) Execute(ctx context.Context, method string, entityPath string, body io.Reader) (*http.Response, error)
Execute performs an HTTP request given a http method, path and body
func (*HubManager) List ¶
func (hm *HubManager) List(ctx context.Context) ([]*HubEntity, error)
List fetches all of the Hub for an Event Hubs Namespace
func (HubManager) Post ¶
func (em HubManager) Post(ctx context.Context, entityPath string, body []byte) (*http.Response, error)
Post performs an HTTP POST for a given entity path and body
func (*HubManager) Put ¶
func (hm *HubManager) Put(ctx context.Context, name string, opts ...HubManagementOption) (*HubEntity, error)
Put creates or updates an Event Hubs Hub
type HubOption ¶
HubOption provides structure for configuring new Event Hub clients. For building new Event Hubs, see HubManagementOption.
func HubWithEnvironment ¶
func HubWithEnvironment(env azure.Environment) HubOption
HubWithEnvironment configures the Hub to use the specified environment.
By default, the Hub instance will use Azure US Public cloud environment
func HubWithOffsetPersistence ¶
func HubWithOffsetPersistence(offsetPersister persist.CheckpointPersister) HubOption
HubWithOffsetPersistence configures the Hub instance to read and write offsets so that if a Hub is interrupted, it can resume after the last consumed event.
func HubWithPartitionedSender ¶
HubWithPartitionedSender configures the Hub instance to send to a specific event Hub partition
func HubWithUserAgent ¶
HubWithUserAgent configures the Hub to append the given string to the user agent sent to the server
This option can be specified multiple times to add additional segments.
Max user agent length is specified by the const maxUserAgentLen.
func HubWithWebSocketConnection ¶
func HubWithWebSocketConnection() HubOption
HubWithWebSocketConnection configures the Hub to use a WebSocket connection wss:// rather than amqps://
type HubPartitionRuntimeInformation ¶
type HubPartitionRuntimeInformation struct { HubPath string `mapstructure:"name"` PartitionID string `mapstructure:"partition"` BeginningSequenceNumber int64 `mapstructure:"begin_sequence_number"` LastSequenceNumber int64 `mapstructure:"last_enqueued_sequence_number"` LastEnqueuedOffset string `mapstructure:"last_enqueued_offset"` LastEnqueuedTimeUtc time.Time `mapstructure:"last_enqueued_time_utc"` }
HubPartitionRuntimeInformation provides management node information about a given Event Hub partition
type HubRuntimeInformation ¶
type HubRuntimeInformation struct { Path string `mapstructure:"name"` CreatedAt time.Time `mapstructure:"created_at"` PartitionCount int `mapstructure:"partition_count"` PartitionIDs []string `mapstructure:"partition_ids"` }
HubRuntimeInformation provides management node information about a given Event Hub instance
type ListenerHandle ¶
type ListenerHandle struct {
// contains filtered or unexported fields
}
ListenerHandle provides the ability to close or listen to the close of a Receiver
func (*ListenerHandle) Close ¶
func (lc *ListenerHandle) Close(ctx context.Context) error
Close will close the listener
func (*ListenerHandle) Done ¶
func (lc *ListenerHandle) Done() <-chan struct{}
Done will close the channel when the listener has stopped
func (*ListenerHandle) Err ¶
func (lc *ListenerHandle) Err() error
Err will return the last error encountered
type Manager ¶
type Manager interface { GetRuntimeInformation(context.Context) (HubRuntimeInformation, error) GetPartitionInformation(context.Context, string) (HubPartitionRuntimeInformation, error) }
Manager provides the ability to query management node information about a node
type PartitionedReceiver ¶
type PartitionedReceiver interface {
Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) (ListenerHandle, error)
}
PartitionedReceiver provides the ability to receive messages from a given partition
type ReceiveOption ¶
type ReceiveOption func(receiver *receiver) error
ReceiveOption provides a structure for configuring receivers
func ReceiveFromTimestamp ¶
func ReceiveFromTimestamp(t time.Time) ReceiveOption
ReceiveFromTimestamp configures the receiver to start receiving from a specific point in time in the event stream
func ReceiveWithConsumerGroup ¶
func ReceiveWithConsumerGroup(consumerGroup string) ReceiveOption
ReceiveWithConsumerGroup configures the receiver to listen to a specific consumer group
func ReceiveWithEpoch ¶
func ReceiveWithEpoch(epoch int64) ReceiveOption
ReceiveWithEpoch configures the receiver to use an epoch. Specifying an epoch for a receiver will cause any receiver with a lower epoch value to be disconnected from the message broker. If a receiver attempts to start with a lower epoch than the broker currently knows for a given partition, the broker will respond with an error on initiation of the receive request.
Ownership enforcement: Once you created an epoch based receiver, you cannot create a non-epoch receiver to the same consumer group / partition combo until all receivers to the combo are closed.
Ownership stealing: If a receiver with higher epoch value is created for a consumer group / partition combo, any older epoch receiver to that combo will be force closed.
func ReceiveWithLatestOffset ¶
func ReceiveWithLatestOffset() ReceiveOption
ReceiveWithLatestOffset configures the receiver to start at a given position in the event stream
func ReceiveWithPrefetchCount ¶
func ReceiveWithPrefetchCount(prefetch uint32) ReceiveOption
ReceiveWithPrefetchCount configures the receiver to attempt to fetch as many messages as the prefetch amount
func ReceiveWithStartingOffset ¶
func ReceiveWithStartingOffset(offset string) ReceiveOption
ReceiveWithStartingOffset configures the receiver to start at a given position in the event stream
type SendOption ¶
SendOption provides a way to customize a message on sending
func SendWithMessageID ¶
func SendWithMessageID(messageID string) SendOption
SendWithMessageID configures the message with a message ID
type Sender ¶
type Sender interface { Send(ctx context.Context, event *Event, opts ...SendOption) error SendBatch(ctx context.Context, batch *EventBatch, opts ...SendOption) error }
Sender provides the ability to send a messages
type SystemProperties ¶
type SystemProperties struct { SequenceNumber *int64 `mapstructure:"x-opt-sequence-number"` // unique sequence number of the message EnqueuedTime *time.Time `mapstructure:"x-opt-enqueued-time"` // time the message landed in the message queue Offset *int64 `mapstructure:"x-opt-offset"` PartitionID *int16 `mapstructure:"x-opt-partition-id"` PartitionKey *string `mapstructure:"x-opt-partition-key"` }
SystemProperties are used to store properties that are set by the system.
Source Files
Directories
Path | Synopsis |
---|---|
_examples
|
|
Package atom contains base data structures for use in the Azure Event Hubs management HTTP API
|
Package atom contains base data structures for use in the Azure Event Hubs management HTTP API |
Package eph provides functionality for balancing load of Event Hub receivers through scheduling receivers across processes and machines.
|
Package eph provides functionality for balancing load of Event Hub receivers through scheduling receivers across processes and machines. |
internal
|
|
test
Package test is an internal package to handle common test setup
|
Package test is an internal package to handle common test setup |
Package storage provides implementations for Checkpointer and Leaser from package eph for persisting leases and checkpoints for the Event Processor Host using Azure Storage as a durable store.
|
Package storage provides implementations for Checkpointer and Leaser from package eph for persisting leases and checkpoints for the Event Processor Host using Azure Storage as a durable store. |