eph

package
v0.0.0-...-01c0456 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2018 License: MIT Imports: 18 Imported by: 0

Documentation

Overview

Package eph provides functionality for balancing load of Event Hub receivers through scheduling receivers across processes and machines.

Index

Constants

View Source
const (
	// DefaultLeaseRenewalInterval defines the default amount of time between lease renewal attempts
	DefaultLeaseRenewalInterval = 20 * time.Second

	// DefaultLeaseDuration defines the default amount of time a lease is valid
	DefaultLeaseDuration = 45 * time.Second
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Checkpointer

type Checkpointer interface {
	io.Closer
	StoreProvisioner
	EventProcessHostSetter
	GetCheckpoint(ctx context.Context, partitionID string) (persist.Checkpoint, bool)
	EnsureCheckpoint(ctx context.Context, partitionID string) (persist.Checkpoint, error)
	UpdateCheckpoint(ctx context.Context, partitionID string, checkpoint persist.Checkpoint) error
	DeleteCheckpoint(ctx context.Context, partitionID string) error
}

Checkpointer interface provides the ability to persist durable checkpoints for event processors

type EventProcessHostSetter

type EventProcessHostSetter interface {
	SetEventHostProcessor(eph *EventProcessorHost)
}

EventProcessHostSetter provides the ability to set an EventHostProcessor on the implementor

type EventProcessorHost

type EventProcessorHost struct {
	// contains filtered or unexported fields
}

EventProcessorHost provides functionality for coordinating and balancing load across multiple Event Hub partitions

func New

func New(ctx context.Context, namespace, hubName string, tokenProvider auth.TokenProvider, leaser Leaser, checkpointer Checkpointer, opts ...EventProcessorHostOption) (*EventProcessorHost, error)

New constructs a new instance of an EventHostProcessor

func (*EventProcessorHost) Close

func (h *EventProcessorHost) Close(ctx context.Context) error

Close stops the EventHostProcessor from processing messages

func (*EventProcessorHost) GetName

func (h *EventProcessorHost) GetName() string

GetName returns the name of the EventProcessorHost

func (*EventProcessorHost) GetPartitionIDs

func (h *EventProcessorHost) GetPartitionIDs() []string

GetPartitionIDs fetches the partition IDs for the Event Hub

func (*EventProcessorHost) PartitionIDsBeingProcessed

func (h *EventProcessorHost) PartitionIDsBeingProcessed() []string

PartitionIDsBeingProcessed returns the partition IDs currently receiving messages

func (*EventProcessorHost) Receive

func (h *EventProcessorHost) Receive(handler eventhub.Handler) (close func() error, err error)

Receive provides the ability to register a handler for processing Event Hub messages

func (*EventProcessorHost) Start

func (h *EventProcessorHost) Start(ctx context.Context) error

Start begins processing of messages for registered handlers on the EventHostProcessor. The call is blocking.

func (*EventProcessorHost) StartNonBlocking

func (h *EventProcessorHost) StartNonBlocking(ctx context.Context) error

StartNonBlocking begins processing of messages for registered handlers

type EventProcessorHostOption

type EventProcessorHostOption func(host *EventProcessorHost) error

EventProcessorHostOption provides configuration options for an EventProcessorHost

func WithNoBanner

func WithNoBanner() EventProcessorHostOption

WithNoBanner will configure an EventProcessorHost to not output the banner upon start

type Lease

type Lease struct {
	PartitionID string `json:"partitionID"`
	Epoch       int64  `json:"epoch"`
	Owner       string `json:"owner"`
}

Lease represents the information needed to coordinate partitions

func (*Lease) GetEpoch

func (l *Lease) GetEpoch() int64

GetEpoch returns the value of the epoch

func (*Lease) GetOwner

func (l *Lease) GetOwner() string

GetOwner returns the owner of the lease

func (*Lease) GetPartitionID

func (l *Lease) GetPartitionID() string

GetPartitionID returns the partition which belongs to this lease

func (*Lease) IncrementEpoch

func (l *Lease) IncrementEpoch() int64

IncrementEpoch increase the time on the lease by one

func (*Lease) String

func (l *Lease) String() string

type LeaseMarker

type LeaseMarker interface {
	GetPartitionID() string
	IsExpired(context.Context) bool
	GetOwner() string
	IncrementEpoch() int64
	GetEpoch() int64
	String() string
}

LeaseMarker provides the functionality expected of a partition lease with an owner

type Leaser

type Leaser interface {
	io.Closer
	StoreProvisioner
	EventProcessHostSetter
	GetLeases(ctx context.Context) ([]LeaseMarker, error)
	EnsureLease(ctx context.Context, partitionID string) (LeaseMarker, error)
	DeleteLease(ctx context.Context, partitionID string) error
	AcquireLease(ctx context.Context, partitionID string) (LeaseMarker, bool, error)
	RenewLease(ctx context.Context, partitionID string) (LeaseMarker, bool, error)
	ReleaseLease(ctx context.Context, partitionID string) (bool, error)
	UpdateLease(ctx context.Context, partitionID string) (LeaseMarker, bool, error)
}

Leaser provides the functionality needed to persist and coordinate leases for partitions

type Receiver

type Receiver interface {
	Receive(ctx context.Context, handler eventhub.Handler) (close func() error, err error)
}

Receiver provides the ability to handle Event Hub events

type StoreProvisioner

type StoreProvisioner interface {
	StoreExists(ctx context.Context) (bool, error)
	EnsureStore(ctx context.Context) error
	DeleteStore(ctx context.Context) error
}

StoreProvisioner provides CRUD functionality for Lease and Checkpoint storage

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL