Version: v1.3.1 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2019 License: MIT Imports: 20 Imported by: 10



Package eph provides functionality for balancing load of Event Hub receivers through scheduling receivers across processes and machines.



View Source
const (
	// DefaultLeaseRenewalInterval defines the default amount of time between lease renewal attempts
	DefaultLeaseRenewalInterval = 10 * time.Second

	// DefaultLeaseDuration defines the default amount of time a lease is valid
	DefaultLeaseDuration = 60 * time.Second


This section is empty.


This section is empty.


type Checkpointer

type Checkpointer interface {
	GetCheckpoint(ctx context.Context, partitionID string) (persist.Checkpoint, bool)
	EnsureCheckpoint(ctx context.Context, partitionID string) (persist.Checkpoint, error)
	UpdateCheckpoint(ctx context.Context, partitionID string, checkpoint persist.Checkpoint) error
	DeleteCheckpoint(ctx context.Context, partitionID string) error

Checkpointer interface provides the ability to persist durable checkpoints for event processors

type EventProcessHostSetter

type EventProcessHostSetter interface {
	SetEventHostProcessor(eph *EventProcessorHost)

EventProcessHostSetter provides the ability to set an EventHostProcessor on the implementor

type EventProcessorHost

type EventProcessorHost struct {
	// contains filtered or unexported fields

EventProcessorHost provides functionality for coordinating and balancing load across multiple Event Hub partitions

func New

func New(ctx context.Context, namespace, hubName string, tokenProvider auth.TokenProvider, leaser Leaser, checkpointer Checkpointer, opts ...EventProcessorHostOption) (*EventProcessorHost, error)

New constructs a new instance of an EventHostProcessor

func NewFromConnectionString added in v1.0.0

func NewFromConnectionString(ctx context.Context, connStr string, leaser Leaser, checkpointer Checkpointer, opts ...EventProcessorHostOption) (*EventProcessorHost, error)

NewFromConnectionString builds a new Event Processor Host from an Event Hub connection string which can be found in the Azure portal

func (*EventProcessorHost) Close

func (h *EventProcessorHost) Close(ctx context.Context) error

Close stops the EventHostProcessor from processing messages

func (*EventProcessorHost) GetName

func (h *EventProcessorHost) GetName() string

GetName returns the name of the EventProcessorHost

func (*EventProcessorHost) GetPartitionIDs

func (h *EventProcessorHost) GetPartitionIDs() []string

GetPartitionIDs fetches the partition IDs for the Event Hub

func (*EventProcessorHost) PartitionIDsBeingProcessed

func (h *EventProcessorHost) PartitionIDsBeingProcessed() []string

PartitionIDsBeingProcessed returns the partition IDs currently receiving messages

func (*EventProcessorHost) RegisterHandler added in v0.4.0

func (h *EventProcessorHost) RegisterHandler(ctx context.Context, handler eventhub.Handler) (HandlerID, error)

RegisterHandler will register an event handler which will receive events after Start or StartNonBlocking is called

func (*EventProcessorHost) RegisteredHandlerIDs added in v0.4.0

func (h *EventProcessorHost) RegisteredHandlerIDs() []HandlerID

RegisteredHandlerIDs will return the registered event handler IDs

func (*EventProcessorHost) Start

func (h *EventProcessorHost) Start(ctx context.Context) error

Start begins processing of messages for registered handlers on the EventHostProcessor. The call is blocking.

func (*EventProcessorHost) StartNonBlocking

func (h *EventProcessorHost) StartNonBlocking(ctx context.Context) error

StartNonBlocking begins processing of messages for registered handlers

func (*EventProcessorHost) UnregisterHandler added in v0.4.0

func (h *EventProcessorHost) UnregisterHandler(ctx context.Context, id HandlerID)

UnregisterHandler will remove an event handler from receiving events, and will close the EventProcessorHost if it is the last handler registered.

type EventProcessorHostOption

type EventProcessorHostOption func(host *EventProcessorHost) error

EventProcessorHostOption provides configuration options for an EventProcessorHost

func WithConsumerGroup added in v1.0.1

func WithConsumerGroup(consumerGroup string) EventProcessorHostOption

WithConsumerGroup will configure an EventProcessorHost to a specific consumer group

func WithEnvironment added in v1.0.0

func WithEnvironment(env azure.Environment) EventProcessorHostOption

WithEnvironment will configure an EventProcessorHost to use the specified Azure Environment

func WithNoBanner added in v0.2.0

func WithNoBanner() EventProcessorHostOption

WithNoBanner will configure an EventProcessorHost to not output the banner upon start

type HandlerID added in v0.4.0

type HandlerID string

HandlerID is a UUID in string format that identifies a registered handler

type Lease

type Lease struct {
	PartitionID string `json:"partitionID"`
	Epoch       int64  `json:"epoch"`
	Owner       string `json:"owner"`

Lease represents the information needed to coordinate partitions

func (*Lease) GetEpoch

func (l *Lease) GetEpoch() int64

GetEpoch returns the value of the epoch

func (*Lease) GetOwner

func (l *Lease) GetOwner() string

GetOwner returns the owner of the lease

func (*Lease) GetPartitionID

func (l *Lease) GetPartitionID() string

GetPartitionID returns the partition which belongs to this lease

func (*Lease) IncrementEpoch

func (l *Lease) IncrementEpoch() int64

IncrementEpoch increase the time on the lease by one

func (*Lease) String added in v0.2.1

func (l *Lease) String() string

type LeaseMarker

type LeaseMarker interface {
	GetPartitionID() string
	IsExpired(context.Context) bool
	GetOwner() string
	IncrementEpoch() int64
	GetEpoch() int64
	String() string

LeaseMarker provides the functionality expected of a partition lease with an owner

type Leaser

type Leaser interface {
	GetLeases(ctx context.Context) ([]LeaseMarker, error)
	EnsureLease(ctx context.Context, partitionID string) (LeaseMarker, error)
	DeleteLease(ctx context.Context, partitionID string) error
	AcquireLease(ctx context.Context, partitionID string) (LeaseMarker, bool, error)
	RenewLease(ctx context.Context, partitionID string) (LeaseMarker, bool, error)
	ReleaseLease(ctx context.Context, partitionID string) (bool, error)
	UpdateLease(ctx context.Context, partitionID string) (LeaseMarker, bool, error)

Leaser provides the functionality needed to persist and coordinate leases for partitions

type Receiver

type Receiver interface {
	Receive(ctx context.Context, handler eventhub.Handler) (close func() error, err error)

Receiver provides the ability to handle Event Hub events

type StoreProvisioner

type StoreProvisioner interface {
	StoreExists(ctx context.Context) (bool, error)
	EnsureStore(ctx context.Context) error
	DeleteStore(ctx context.Context) error

StoreProvisioner provides CRUD functionality for Lease and Checkpoint storage

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL